[jira] [Assigned] (KAFKA-17619) Remove zk type and instance from ClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-17619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-17619: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > Remove zk type and instance from ClusterTest > > > Key: KAFKA-17619 > URL: https://issues.apache.org/jira/browse/KAFKA-17619 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Major > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17116: New consumer may not send effective leave group if member ID received after close [kafka]
frankvicky closed pull request #16649: KAFKA-17116: New consumer may not send effective leave group if member ID received after close URL: https://github.com/apache/kafka/pull/16649 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
smjn commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776430833 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java: ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.share.PersisterStateBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; + +public class StateBatchUtil { +/** + * Util method which takes in 2 lists containing {@link PersisterStateBatch} + * and the startOffset. + * This method removes any batches where the lastOffset < startOffset, if the startOffset > -1. + * It then merges any contiguous intervals with same state. If states differ, + * based on various conditions it creates new non-overlapping batches preferring new ones. + * @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch} + * @param newBatches - List containing {@link PersisterStateBatch} in incoming request + * @param startOffset - startOffset to consider when removing old batches. + * @return List containing combined batches + */ +public static List combineStateBatches( +List batchesSoFar, +List newBatches, +long startOffset +) { +List combinedList = new ArrayList<>(batchesSoFar.size() + newBatches.size()); +combinedList.addAll(batchesSoFar); +combinedList.addAll(newBatches); + +return mergeBatches( +pruneBatches( +combinedList, +startOffset +) +); +} + +/** + * Encapsulates the main merge algorithm. Consider 2 batches (A, B): + * - Same state (delivery count and state) + * - If overlapping - merge into single batch + * - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches into a single 1 + * - Different state (delivery count or state differ) + * - Based on various cases: + * - swallow lower priority batch within bounds of offsets + * - break batch into other non-overlapping batches + * @param batches - List of {@link PersisterStateBatch} + * @return List of non-overlapping {@link PersisterStateBatch} + */ +private static List mergeBatches(List batches) { +if (batches.size() < 2) { +return batches; +} +TreeSet sortedBatches = new TreeSet<>(batches); +List finalBatches = new ArrayList<>(batches.size() * 2); // heuristic size + +BatchOverlapState overlapState = getOverlappingState(sortedBatches); + +while (overlapState != BatchOverlapState.SENTINEL) { +PersisterStateBatch last = overlapState.last(); +PersisterStateBatch candidate = overlapState.candidate(); + +// remove non overlapping prefix from sortedBatches, +// will make getting next overlapping pair efficient +// as a prefix batch which is non overlapping will only +// be checked once. +if (overlapState.nonOverlapping() != null) { +overlapState.nonOverlapping().forEach(sortedBatches::remove); +finalBatches.addAll(overlapState.nonOverlapping()); +} + +if (candidate == null) { +overlapState = BatchOverlapState.SENTINEL; +continue; +} + +// remove both last and candidate for easier +// assessment about adding batches to sortedBatches +sortedBatches.remove(last); +sortedBatches.remove(candidate); + +// overlap and same state (last.firstOffset <= candidate.firstOffset) due to sort +// covers: +// case:12 34 5 6 7 (contiguous) +// last:__ ______ ___ ___ ___ +// candidate:
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
smjn commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776567400 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java: ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.share.PersisterStateBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; + +public class StateBatchUtil { +/** + * Util method which takes in 2 lists containing {@link PersisterStateBatch} + * and the startOffset. + * This method removes any batches where the lastOffset < startOffset, if the startOffset > -1. + * It then merges any contiguous intervals with same state. If states differ, + * based on various conditions it creates new non-overlapping batches preferring new ones. + * @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch} + * @param newBatches - List containing {@link PersisterStateBatch} in incoming request + * @param startOffset - startOffset to consider when removing old batches. + * @return List containing combined batches + */ +public static List combineStateBatches( +List batchesSoFar, +List newBatches, +long startOffset +) { +List combinedList = new ArrayList<>(batchesSoFar.size() + newBatches.size()); +combinedList.addAll(batchesSoFar); +combinedList.addAll(newBatches); + +return mergeBatches( +pruneBatches( +combinedList, +startOffset +) +); +} + +/** + * Encapsulates the main merge algorithm. Consider 2 batches (A, B): + * - Same state (delivery count and state) + * - If overlapping - merge into single batch + * - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches into a single 1 + * - Different state (delivery count or state differ) + * - Based on various cases: + * - swallow lower priority batch within bounds of offsets + * - break batch into other non-overlapping batches + * @param batches - List of {@link PersisterStateBatch} + * @return List of non-overlapping {@link PersisterStateBatch} + */ +private static List mergeBatches(List batches) { +if (batches.size() < 2) { +return batches; +} +TreeSet sortedBatches = new TreeSet<>(batches); +List finalBatches = new ArrayList<>(batches.size() * 2); // heuristic size + +BatchOverlapState overlapState = getOverlappingState(sortedBatches); + +while (overlapState != BatchOverlapState.SENTINEL) { +PersisterStateBatch last = overlapState.last(); +PersisterStateBatch candidate = overlapState.candidate(); + +// remove non overlapping prefix from sortedBatches, +// will make getting next overlapping pair efficient +// as a prefix batch which is non overlapping will only +// be checked once. +if (overlapState.nonOverlapping() != null) { +overlapState.nonOverlapping().forEach(sortedBatches::remove); +finalBatches.addAll(overlapState.nonOverlapping()); +} + +if (candidate == null) { +overlapState = BatchOverlapState.SENTINEL; +continue; +} + +// remove both last and candidate for easier +// assessment about adding batches to sortedBatches +sortedBatches.remove(last); +sortedBatches.remove(candidate); + +// overlap and same state (last.firstOffset <= candidate.firstOffset) due to sort +// covers: +// case:12 34 5 6 7 (contiguous) +// last:__ ______ ___ ___ ___ +// candidate:
[jira] [Commented] (KAFKA-17547) Write large number of mirror maker logs when Kafka crashes
[ https://issues.apache.org/jira/browse/KAFKA-17547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884911#comment-17884911 ] George Yang commented on KAFKA-17547: - Could someone who has a deep understanding of Mirror Maker 2 please take a look? > Write large number of mirror maker logs when Kafka crashes > -- > > Key: KAFKA-17547 > URL: https://issues.apache.org/jira/browse/KAFKA-17547 > Project: Kafka > Issue Type: Task > Components: mirrormaker >Affects Versions: 3.7.1 > Environment: OS: AlmaLinux 9.3 > CPU: 28cores > Mem: 128GiB > Kafka: v3.7.0 > MirrorMaker2: v3.7.1 >Reporter: George Yang >Priority: Blocker > Attachments: connect.log > > Original Estimate: 48h > Remaining Estimate: 48h > > We have deployed 2 data centers, each with one node, and on each node, there > is a Kafka pod and a MirrorMaker2 pod. Currently, if one of the Kafka pods > crashes, the running MirrorMaker2 pod will continuously output a large number > of logs (please see the attachment). As long as the Kafka pod remains down, > the logs will keep accumulating, eventually filling up the disk. Besides > stopping all the MirrorMaker2 instances or getting the crashed Kafka pod back > online, are there any other solutions to prevent this excessive logging? For > example, can we configure any parameters in MirrorMaker2 to handle this? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
smjn commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776624490 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java: ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.share.PersisterStateBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; + +public class StateBatchUtil { +/** + * Util method which takes in 2 lists containing {@link PersisterStateBatch} + * and the startOffset. + * This method removes any batches where the lastOffset < startOffset, if the startOffset > -1. + * It then merges any contiguous intervals with same state. If states differ, + * based on various conditions it creates new non-overlapping batches preferring new ones. + * @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch} + * @param newBatches - List containing {@link PersisterStateBatch} in incoming request + * @param startOffset - startOffset to consider when removing old batches. + * @return List containing combined batches + */ +public static List combineStateBatches( +List batchesSoFar, +List newBatches, +long startOffset +) { +List combinedList = new ArrayList<>(batchesSoFar.size() + newBatches.size()); +combinedList.addAll(batchesSoFar); +combinedList.addAll(newBatches); + +return mergeBatches( +pruneBatches( +combinedList, +startOffset +) +); +} + +/** + * Encapsulates the main merge algorithm. Consider 2 batches (A, B): + * - Same state (delivery count and state) + * - If overlapping - merge into single batch + * - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches into a single 1 + * - Different state (delivery count or state differ) + * - Based on various cases: + * - swallow lower priority batch within bounds of offsets + * - break batch into other non-overlapping batches + * @param batches - List of {@link PersisterStateBatch} + * @return List of non-overlapping {@link PersisterStateBatch} + */ +private static List mergeBatches(List batches) { +if (batches.size() < 2) { +return batches; +} +TreeSet sortedBatches = new TreeSet<>(batches); +List finalBatches = new ArrayList<>(batches.size() * 2); // heuristic size + +BatchOverlapState overlapState = getOverlappingState(sortedBatches); + +while (overlapState != BatchOverlapState.SENTINEL) { +PersisterStateBatch last = overlapState.last(); +PersisterStateBatch candidate = overlapState.candidate(); + +// remove non overlapping prefix from sortedBatches, +// will make getting next overlapping pair efficient +// as a prefix batch which is non overlapping will only +// be checked once. +if (overlapState.nonOverlapping() != null) { +overlapState.nonOverlapping().forEach(sortedBatches::remove); +finalBatches.addAll(overlapState.nonOverlapping()); +} + +if (candidate == null) { +overlapState = BatchOverlapState.SENTINEL; +continue; +} + +// remove both last and candidate for easier +// assessment about adding batches to sortedBatches +sortedBatches.remove(last); +sortedBatches.remove(candidate); + +// overlap and same state (last.firstOffset <= candidate.firstOffset) due to sort +// covers: +// case:12 34 5 6 7 (contiguous) +// last:__ ______ ___ ___ ___ +// candidate:
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
smjn commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776624490 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java: ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.share.PersisterStateBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; + +public class StateBatchUtil { +/** + * Util method which takes in 2 lists containing {@link PersisterStateBatch} + * and the startOffset. + * This method removes any batches where the lastOffset < startOffset, if the startOffset > -1. + * It then merges any contiguous intervals with same state. If states differ, + * based on various conditions it creates new non-overlapping batches preferring new ones. + * @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch} + * @param newBatches - List containing {@link PersisterStateBatch} in incoming request + * @param startOffset - startOffset to consider when removing old batches. + * @return List containing combined batches + */ +public static List combineStateBatches( +List batchesSoFar, +List newBatches, +long startOffset +) { +List combinedList = new ArrayList<>(batchesSoFar.size() + newBatches.size()); +combinedList.addAll(batchesSoFar); +combinedList.addAll(newBatches); + +return mergeBatches( +pruneBatches( +combinedList, +startOffset +) +); +} + +/** + * Encapsulates the main merge algorithm. Consider 2 batches (A, B): + * - Same state (delivery count and state) + * - If overlapping - merge into single batch + * - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches into a single 1 + * - Different state (delivery count or state differ) + * - Based on various cases: + * - swallow lower priority batch within bounds of offsets + * - break batch into other non-overlapping batches + * @param batches - List of {@link PersisterStateBatch} + * @return List of non-overlapping {@link PersisterStateBatch} + */ +private static List mergeBatches(List batches) { +if (batches.size() < 2) { +return batches; +} +TreeSet sortedBatches = new TreeSet<>(batches); +List finalBatches = new ArrayList<>(batches.size() * 2); // heuristic size + +BatchOverlapState overlapState = getOverlappingState(sortedBatches); + +while (overlapState != BatchOverlapState.SENTINEL) { +PersisterStateBatch last = overlapState.last(); +PersisterStateBatch candidate = overlapState.candidate(); + +// remove non overlapping prefix from sortedBatches, +// will make getting next overlapping pair efficient +// as a prefix batch which is non overlapping will only +// be checked once. +if (overlapState.nonOverlapping() != null) { +overlapState.nonOverlapping().forEach(sortedBatches::remove); +finalBatches.addAll(overlapState.nonOverlapping()); +} + +if (candidate == null) { +overlapState = BatchOverlapState.SENTINEL; +continue; +} + +// remove both last and candidate for easier +// assessment about adding batches to sortedBatches +sortedBatches.remove(last); +sortedBatches.remove(candidate); + +// overlap and same state (last.firstOffset <= candidate.firstOffset) due to sort +// covers: +// case:12 34 5 6 7 (contiguous) +// last:__ ______ ___ ___ ___ +// candidate:
Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]
mjd95 commented on PR #15241: URL: https://github.com/apache/kafka/pull/15241#issuecomment-2376460752 We were the ones discussing with @jeqo - the "caching closed channels" issue was happening regularly for us on 3.8 in production, the thread doing a remote read was interrupted while iterating through the transaction index, we get a `ClosedByInterruptException` on some transaction index file channel, but the closed channel remains in the cache. The only way to mitigate was restarting the broker. We were able to reproduce by setting a low `remote.fetch.max.wait.ms` and setting a small segment size in order to generate many transaction index files. We tested that this PR fixes our repro and cherry-picked it onto our production release, we haven't seen the issue since then. We haven't seen the "race during channel close" issue (which is now also handled in this PR) in production. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17617) New GitHub Actions build builds Java 8 with 2.13 instead of 2.12
[ https://issues.apache.org/jira/browse/KAFKA-17617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884912#comment-17884912 ] Chia-Ping Tsai commented on KAFKA-17617: Scala 2.12 will be dropped in 4.0 (KAFKA-12895), so I think the best solution is to start the process now. > New GitHub Actions build builds Java 8 with 2.13 instead of 2.12 > > > Key: KAFKA-17617 > URL: https://issues.apache.org/jira/browse/KAFKA-17617 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Critical > > I noticed a PR that failed for Jenkins but not for the GitHub Actions build. > Tracing it down, it looks like Jenkins was using Scala 2.12 and GitHub > actions is using 2.13. > We still should support 2.12, so we should fix the GitHub actions now that > Jenkins is removed. Until we fix, folks can merge in code that breaks 2.12 > builds. > See Jenkins for > failure:[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-17093/5/cloudbees-pipeline-explorer/?filter=37] > see for success in GH Actions: > [https://github.com/apache/kafka/actions/runs/11039342155/job/30672276597?pr=17093] > see raw build for 2.13: > [https://productionresultssa2.blob.core.windows.net/actions-results/4f08a774-9253-4d15-8617-a627e9961b76/workflow-job-run-716dd2a8-3073-58cf-0d26-4a389b46b592/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-26T01%3A05%3A04Z&sig=klIXSOTwKN9WrBvtdsN6j45DbSqg7ikwow%2FGETJy5pc%3D&ske=2024-09-26T12%3A26%3A46Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-26T00%3A26%3A46Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-26T00%3A54%3A59Z&sv=2024-05-04] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12895) KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)
[ https://issues.apache.org/jira/browse/KAFKA-12895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884917#comment-17884917 ] Chia-Ping Tsai commented on KAFKA-12895: [~frankvicky] Please review the KIP ([https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)] before submitting the patch, thanks > KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0) > > > Key: KAFKA-12895 > URL: https://issues.apache.org/jira/browse/KAFKA-12895 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: TengYao Chi >Priority: Major > Labels: kip > Fix For: 4.0.0 > > > We propose to deprecate Scala 2.12 support n Apache Kafka 3.0 and to drop it > in Apache Kafka 4.0. > > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17606) Include Rat errors in GitHub workflow summary
[ https://issues.apache.org/jira/browse/KAFKA-17606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884919#comment-17884919 ] Chia-Ping Tsai commented on KAFKA-17606: [~loganzhu] I have assigned this Jira to you, thanks! > Include Rat errors in GitHub workflow summary > - > > Key: KAFKA-17606 > URL: https://issues.apache.org/jira/browse/KAFKA-17606 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: David Arthur >Assignee: Logan Zhu >Priority: Minor > > It is difficult to scroll through the output of "check -x test" to find Rat > errors. There is a summary printed at the end of the Gradle process, but it > just says something like > > {code:java} > * What went wrong: > Execution failed for task ':rat'. > > A failure occurred while executing org.nosphere.apache.rat.RatWork > > Apache Rat audit failure - 2 unapproved licenses > See file:///home/runner/work/kafka/kafka/build/rat/index.html {code} > We should include specific Rat failures in the job summary similar to what we > do for checkstyle failures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17606) Include Rat errors in GitHub workflow summary
[ https://issues.apache.org/jira/browse/KAFKA-17606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-17606: -- Assignee: Logan Zhu > Include Rat errors in GitHub workflow summary > - > > Key: KAFKA-17606 > URL: https://issues.apache.org/jira/browse/KAFKA-17606 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: David Arthur >Assignee: Logan Zhu >Priority: Minor > > It is difficult to scroll through the output of "check -x test" to find Rat > errors. There is a summary printed at the end of the Gradle process, but it > just says something like > > {code:java} > * What went wrong: > Execution failed for task ':rat'. > > A failure occurred while executing org.nosphere.apache.rat.RatWork > > Apache Rat audit failure - 2 unapproved licenses > See file:///home/runner/work/kafka/kafka/build/rat/index.html {code} > We should include specific Rat failures in the job summary similar to what we > do for checkstyle failures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
smjn commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776452137 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java: ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.share.PersisterStateBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; + +public class StateBatchUtil { +/** + * Util method which takes in 2 lists containing {@link PersisterStateBatch} + * and the startOffset. + * This method removes any batches where the lastOffset < startOffset, if the startOffset > -1. + * It then merges any contiguous intervals with same state. If states differ, + * based on various conditions it creates new non-overlapping batches preferring new ones. + * @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch} + * @param newBatches - List containing {@link PersisterStateBatch} in incoming request + * @param startOffset - startOffset to consider when removing old batches. + * @return List containing combined batches + */ +public static List combineStateBatches( +List batchesSoFar, +List newBatches, +long startOffset +) { +List combinedList = new ArrayList<>(batchesSoFar.size() + newBatches.size()); +combinedList.addAll(batchesSoFar); +combinedList.addAll(newBatches); + +return mergeBatches( +pruneBatches( +combinedList, +startOffset +) +); +} + +/** + * Encapsulates the main merge algorithm. Consider 2 batches (A, B): + * - Same state (delivery count and state) + * - If overlapping - merge into single batch + * - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches into a single 1 + * - Different state (delivery count or state differ) + * - Based on various cases: + * - swallow lower priority batch within bounds of offsets + * - break batch into other non-overlapping batches + * @param batches - List of {@link PersisterStateBatch} + * @return List of non-overlapping {@link PersisterStateBatch} + */ +private static List mergeBatches(List batches) { +if (batches.size() < 2) { +return batches; +} +TreeSet sortedBatches = new TreeSet<>(batches); +List finalBatches = new ArrayList<>(batches.size() * 2); // heuristic size + +BatchOverlapState overlapState = getOverlappingState(sortedBatches); + +while (overlapState != BatchOverlapState.SENTINEL) { Review Comment: No the logic is not that simple. A single iteration over the sorted batches might not be enough. The invariant is that the batches remain sorted even after manipulation Consider: ``` A [1,10,0,1] --- B [5,7,0,2] -- C [5,15,0,3] ``` A and B will combine to ``` [1,4,0,1] [5,7,0,2] ---[8,10,0,1] ``` Now when combining with C, we have 2 previous batches to consider. Secondly, ``` --- A [1,10,0,1] --- B [5,7,0,2] --- C [5,7,0,3] ``` A and B will combine to ``` [1,4,0,1] [5,7,0,2] ---[8,10,0,1] --- <- C - we broke invariant for being sorted by batches ``` In the current impl, these situations are implicitly handled by virtue of the treeset. Any newly generated batches are pushed back into the treeset and the `getOverlappingState` method finds the first overlapping pair as well as returns the non-overlapping prefix. The non-overlapping prefix is then REMOVED from the treeset hence, once a batch is no longer overlapping, i
[jira] [Commented] (KAFKA-12895) KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)
[ https://issues.apache.org/jira/browse/KAFKA-12895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884913#comment-17884913 ] Chia-Ping Tsai commented on KAFKA-12895: Our new CI has already dropped support for Scala 2.12, so I'll begin the process. > KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0) > > > Key: KAFKA-12895 > URL: https://issues.apache.org/jira/browse/KAFKA-12895 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Labels: kip > Fix For: 4.0.0 > > > We propose to deprecate Scala 2.12 support n Apache Kafka 3.0 and to drop it > in Apache Kafka 4.0. > > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-12895) KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)
[ https://issues.apache.org/jira/browse/KAFKA-12895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-12895: -- Assignee: TengYao Chi (was: Ismael Juma) > KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0) > > > Key: KAFKA-12895 > URL: https://issues.apache.org/jira/browse/KAFKA-12895 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: TengYao Chi >Priority: Major > Labels: kip > Fix For: 4.0.0 > > > We propose to deprecate Scala 2.12 support n Apache Kafka 3.0 and to drop it > in Apache Kafka 4.0. > > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17587) Move test infrastructure out of core
[ https://issues.apache.org/jira/browse/KAFKA-17587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884923#comment-17884923 ] Chia-Ping Tsai commented on KAFKA-17587: [~davidarthur] I open the https://issues.apache.org/jira/browse/KAFKA-17619 for removing the zk type and instance > Move test infrastructure out of core > > > Key: KAFKA-17587 > URL: https://issues.apache.org/jira/browse/KAFKA-17587 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: David Arthur >Priority: Major > > Currently, our integration test infrastructure exists in the ":core" module's > test sources. This means that any integration test must exist in > "core/src/test/java" or "core/src/test/scala". > This has two negative consequences. First, it means most of our integration > tests live in "core" which is why that module's test time is by far the > highest. The other related problem is that modules cannot easily define > integration tests in their directory due to circularity with the core test > dependency. > For example, ":metadata" could not add ClusterTests because that would > require a dependency on "project(':core').sourceSets.test.output" – but this > can't happen because ":core" depends on ":metadata". > We should refactor our test infrastructure classes so that we can untangle > these dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17619) Remove zk type and instance from ClusterTest
Chia-Ping Tsai created KAFKA-17619: -- Summary: Remove zk type and instance from ClusterTest Key: KAFKA-17619 URL: https://issues.apache.org/jira/browse/KAFKA-17619 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-17500: Metadata redirection for NOT_LEADER_OR_FOLLOWER [kafka]
AndrewJSchofield opened a new pull request, #17279: URL: https://github.com/apache/kafka/pull/17279 This PR implements the metadata redirection feature of the ShareFetch and ShareAcknowledge responses where an error code of NOT_LEADER_OR_FOLLOWER or FENCED_LEADER_EPOCH along with current leader information in the response is used to optimise handling of leadership changes in the client. This is applying the logic of KIP-951 to share group consumers. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17606: Include Rat errors in GitHub workflow summary [kafka]
LoganZhuZzz opened a new pull request, #17280: URL: https://github.com/apache/kafka/pull/17280 In the current GitHub workflow, it has become difficult to locate Rat errors when running `check -x test`. To improve readability and make it easier to identify Rat errors, we need to include them in the workflow summary. ## Key Modifications - Added a call to `rat.py` in the `build.yml` file to annotate Rat errors after the `Compile and validate` step. - Ensured that the annotation for Rat reports will execute **only when the `Compile and validate` step fails.** ## Purpose - Enhance the usability of the GitHub workflow, allowing developers to more easily identify and fix Rat errors. - Improve workflow efficiency by reducing the time spent manually searching for errors. ## Notes - **Please ensure that all relevant workflow tests are run before merging this PR to confirm that the new feature operates correctly.** ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17606: Include Rat errors in GitHub workflow summary [kafka]
LoganZhuZzz commented on PR #17280: URL: https://github.com/apache/kafka/pull/17280#issuecomment-2376561401 @chia7712 please take a look,thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]
lianetm commented on code in PR #16982: URL: https://github.com/apache/kafka/pull/16982#discussion_r1776792978 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) { // poll once to update with the current metadata consumer.poll(Duration.ofMillis(0)); +TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), +"No metadata requests sent"); client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); // no error for no current position assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); -assertEquals(0, client.inFlightRequestCount()); - +if (groupProtocol == GroupProtocol.CLASSIC) { +// Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), +// different from the new async consumer, that will send the LIST_OFFSETS request in the background thread +// on the next background thread poll. +assertEquals(0, client.inFlightRequestCount()); +} // poll once again, which should send the list-offset request consumer.seek(tp0, 50L); consumer.poll(Duration.ofMillis(0)); // requests: list-offset, fetch -assertEquals(2, client.inFlightRequestCount()); +TestUtils.waitForCondition(() -> { +boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS); +boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH); +return hasListOffsetRequest && hasFetchRequest; +}, "No list-offset & fetch request sent"); // no error for no end offset (so unknown lag) assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); // poll once again, which should return the list-offset response // and hence next call would return correct lag result -client.respond(listOffsetsResponse(singletonMap(tp0, 90L))); +ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS); +client.respondToRequest(listOffsetRequest, listOffsetsResponse(singletonMap(tp0, 90L))); consumer.poll(Duration.ofMillis(0)); -assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0)); +// For AsyncKafkaConsumer, subscription sate is updated in background, so the result will eventually be updated. +TestUtils.waitForCondition(() -> { +OptionalLong result = consumer.currentLag(tp0); +return result.isPresent() && result.getAsLong() == 40L; +}, "Subscription state is not updated"); // requests: fetch -assertEquals(1, client.inFlightRequestCount()); +TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FETCH), "No fetch request sent"); // one successful fetch should update the log end offset and the position +ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH); final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); -client.respond(fetchResponse(singletonMap(tp0, fetchInfo))); +client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, fetchInfo))); final ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); assertEquals(5, records.count()); assertEquals(55L, consumer.position(tp0)); // correct lag result -assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0)); +// For AsyncKafkaConsumer, subscription sate is updated in background, so the result will eventually be updated. Review Comment: typo state ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) { // poll once to update with the current metadata consumer.poll(Duration.ofMillis(0)); +TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), +"No metadata requests sent"); client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); // no error for no current position assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); -assertEquals(0, client.inFlightRequestCount()); - +if (groupProtocol == GroupProtocol.CLASSIC) { +// Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), +// different from the new async consumer, that will send the LIST_OFFSETS request in the background thread +// on the next background thread poll. +assertEquals(0, client.inFlightRequestCount()); +
Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]
lianetm commented on code in PR #16982: URL: https://github.com/apache/kafka/pull/16982#discussion_r1776796286 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) { // poll once to update with the current metadata consumer.poll(Duration.ofMillis(0)); +TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), +"No metadata requests sent"); client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); // no error for no current position assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); -assertEquals(0, client.inFlightRequestCount()); - +if (groupProtocol == GroupProtocol.CLASSIC) { +// Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), +// different from the new async consumer, that will send the LIST_OFFSETS request in the background thread +// on the next background thread poll. +assertEquals(0, client.inFlightRequestCount()); +} // poll once again, which should send the list-offset request consumer.seek(tp0, 50L); consumer.poll(Duration.ofMillis(0)); // requests: list-offset, fetch -assertEquals(2, client.inFlightRequestCount()); +TestUtils.waitForCondition(() -> { +boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS); +boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH); +return hasListOffsetRequest && hasFetchRequest; +}, "No list-offset & fetch request sent"); // no error for no end offset (so unknown lag) assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); // poll once again, which should return the list-offset response // and hence next call would return correct lag result -client.respond(listOffsetsResponse(singletonMap(tp0, 90L))); +ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS); +client.respondToRequest(listOffsetRequest, listOffsetsResponse(singletonMap(tp0, 90L))); consumer.poll(Duration.ofMillis(0)); -assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0)); +// For AsyncKafkaConsumer, subscription sate is updated in background, so the result will eventually be updated. +TestUtils.waitForCondition(() -> { +OptionalLong result = consumer.currentLag(tp0); +return result.isPresent() && result.getAsLong() == 40L; +}, "Subscription state is not updated"); // requests: fetch -assertEquals(1, client.inFlightRequestCount()); +TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FETCH), "No fetch request sent"); // one successful fetch should update the log end offset and the position +ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH); final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); -client.respond(fetchResponse(singletonMap(tp0, fetchInfo))); +client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, fetchInfo))); final ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); assertEquals(5, records.count()); assertEquals(55L, consumer.position(tp0)); // correct lag result -assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0)); +// For AsyncKafkaConsumer, subscription sate is updated in background, so the result will eventually be updated. +TestUtils.waitForCondition(() -> { Review Comment: Is this change really needed? In this case we just did a successful fetch, so position is updated to 55 (ln 2541). We should be able to retrieve the lag of 45 (end offsets is already known to be 100). (Is not exactly the same case as above, where we needed to allow for the ListOffsets response to be processed in the background). Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17620) Simplify share partition acquire API
Apoorv Mittal created KAFKA-17620: - Summary: Simplify share partition acquire API Key: KAFKA-17620 URL: https://issues.apache.org/jira/browse/KAFKA-17620 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal Simplify share partition acquire API to remove completable future as there do not exist any future calls. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13938) Jenkins builds are timing out after streams integration tests
[ https://issues.apache.org/jira/browse/KAFKA-13938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884953#comment-17884953 ] João Pedro Fonseca commented on KAFKA-13938: Hi, [~mumrah]! Since Jenkins was disabled yesterday, could this taks be closed? > Jenkins builds are timing out after streams integration tests > - > > Key: KAFKA-13938 > URL: https://issues.apache.org/jira/browse/KAFKA-13938 > Project: Kafka > Issue Type: Task > Components: build >Reporter: David Arthur >Priority: Major > > Jenkins PR builder is sporadically failing with timeouts. A few examples: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-12136/5/execution/node/137/log/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-12207/1/execution/node/137/log/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-12062/7/execution/node/138/log/ > In these examples, the timeout occurs after > 22:14:00 streams-5: SMOKE-TEST-CLIENT-CLOSED > 22:14:00 streams-6: SMOKE-TEST-CLIENT-CLOSED > 22:14:00 streams-3: SMOKE-TEST-CLIENT-CLOSED > 22:14:00 streams-1: SMOKE-TEST-CLIENT-CLOSED > 22:14:00 streams-2: SMOKE-TEST-CLIENT-CLOSED > 22:14:00 streams-4: SMOKE-TEST-CLIENT-CLOSED > 22:14:00 streams-0: SMOKE-TEST-CLIENT-CLOSED > 04:07:06 Sending interrupt signal to process > 04:07:17 > 04:07:17 > Task :core:integrationTest FAILED > 04:07:17 > 04:07:17 FAILURE: Build failed with an exception. > These examples also include an uncaught exception just before the apparent > stall > 05:56:05 Exception: java.lang.AssertionError thrown from the > UncaughtExceptionHandler in thread > "appId_StreamsUncaughtExceptionHandlerIntegrationTestnull-badff0e3-a316-425b-841c-28352cd553ac-StreamThread-1" -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17488: Cleanup (test) code for Kafka Streams "metric version" [kafka]
fonsdant commented on code in PR #17182: URL: https://github.com/apache/kafka/pull/17182#discussion_r1776825720 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -3030,7 +3030,7 @@ private StreamTask createSingleSourceStateless(final StreamsConfig config, topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), -new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, time), Review Comment: Sure! Thanks for this catch, Matthias! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll [kafka]
lianetm commented on code in PR #17165: URL: https://github.com/apache/kafka/pull/17165#discussion_r1776831516 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ## @@ -458,14 +466,24 @@ String memberIdInfoForLog() { } /** - * Join the group with the updated subscription, if the member is not part of it yet. If the - * member is already part of the group, this will only ensure that the updated subscription + * Set {@link #subscriptionUpdated} to true to indicate that the subscription has been updated. + * The next {@link #maybeJoinGroup()} will join the group with the updated subscription, if the member is not part of it yet. + * If the member is already part of the group, this will only ensure that the updated subscription * is included in the next heartbeat request. * * Note that list of topics of the subscription is taken from the shared subscription state. */ public void onSubscriptionUpdated() { -if (state == MemberState.UNSUBSCRIBED) { +subscriptionUpdated.compareAndSet(false, true); +} + +/** + * Join the group if the member is not part of it yet. This function separates {@link #transitionToJoining} + * from the {@link #onSubscriptionUpdated} to fulfill the requirement of the "rebalances will only occur during an + * active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}" + */ +public void maybeJoinGroup() { Review Comment: nit: what do you think about calling this something like `onConsumerPoll`? Just to make the func self explanatory (on poll -> transition to joining), no need to find usages to understand what this is all about. No strong feeling though, up to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17505: New consumer seekToBeginning/End should run in background thread [kafka]
lianetm commented on code in PR #17230: URL: https://github.com/apache/kafka/pull/17230#discussion_r1776850858 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -842,8 +843,8 @@ public void seekToEnd(Collection partitions) { acquireAndEnsureOpen(); try { -Collection parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions; -subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST); +cachedSubscriptionHasAllFetchPositions = false; +applicationEventHandler.add(new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST)); Review Comment: Hey here, sorry for the late reply. I agree that these 3 methods should have similar behaviour, but I wonder if the case is that they should all use `addAndGet`? My concern is, when seeking (either of the 3 methods), we do expect that as soon as they complete, the position is updated so that other funcs will see it (ex. currentLag). We would be not respecting that anymore if we use add. seek could complete without having updated the offsets right? From the seek contract, it states that "Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}", we we need to: 1. override fetch offsets (this requires `addAndGet` I expect) 2. make sure they are available on the next poll (again addAndGet on seek, otherwise consumer.seek + consumer.poll may not find the offsets if the poll happens when the `seek` with `add` is still being processed in the background right?) Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17505: New consumer seekToBeginning/End should run in background thread [kafka]
lianetm commented on code in PR #17230: URL: https://github.com/apache/kafka/pull/17230#discussion_r1776850858 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -842,8 +843,8 @@ public void seekToEnd(Collection partitions) { acquireAndEnsureOpen(); try { -Collection parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions; -subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST); +cachedSubscriptionHasAllFetchPositions = false; +applicationEventHandler.add(new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST)); Review Comment: Hey here, sorry for the late reply. I agree that these 3 methods should have similar behaviour, but I wonder if the case is that they should all use `addAndGet`? My concern is, when seeking (either of the 3 methods), we do expect that as soon as they complete, the position is updated so that other funcs will see it (ex. currentLag, poll). We would be not respecting that anymore if we use add. seek could complete without having updated the offsets right? From the seek contract, it states that "Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}", we we need to: 1. override fetch offsets (this requires `addAndGet` I expect) 2. make sure they are available on the next poll (again addAndGet on seek, otherwise consumer.seek + consumer.poll may not find the offsets if the poll happens when the `seek` with `add` is still being processed in the background right?). Also calls like currentLag after that seek + add could not have the info they need I guess. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17496) Add heterogeneous configuration to TargetAssignmentBuilderBenchmark
[ https://issues.apache.org/jira/browse/KAFKA-17496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-17496. - Fix Version/s: 4.0.0 Resolution: Fixed > Add heterogeneous configuration to TargetAssignmentBuilderBenchmark > --- > > Key: KAFKA-17496 > URL: https://issues.apache.org/jira/browse/KAFKA-17496 > Project: Kafka > Issue Type: Sub-task >Reporter: Sean Quah >Assignee: Sean Quah >Priority: Minor > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17616) Remove KafkaServer
[ https://issues.apache.org/jira/browse/KAFKA-17616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884885#comment-17884885 ] Dmitry Werner commented on KAFKA-17616: --- [~cmccabe] Hello, if you are not start working on this issue, I would like to have it. > Remove KafkaServer > -- > > Key: KAFKA-17616 > URL: https://issues.apache.org/jira/browse/KAFKA-17616 > Project: Kafka > Issue Type: Sub-task >Reporter: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17571; Revert "MINOR: Log pending join members (#17219)" [kafka]
dajac commented on PR #17274: URL: https://github.com/apache/kafka/pull/17274#issuecomment-2376097825 @mumrah @chia7712 Long story short, I did not find any correctness issue in the new group coordinator. All the failed tests where due to pending members in the classic group preventing the group from stabilizing itself in time. This is actually a known source of flakiness in this area. Chris has been working on a patch to improve it: https://issues.apache.org/jira/browse/KAFKA-17115. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17571; Revert "MINOR: Log pending join members (#17219)" [kafka]
dajac merged PR #17274: URL: https://github.com/apache/kafka/pull/17274 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17571) Revert #17219
[ https://issues.apache.org/jira/browse/KAFKA-17571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-17571. - Fix Version/s: 4.0.0 Resolution: Fixed > Revert #17219 > - > > Key: KAFKA-17571 > URL: https://issues.apache.org/jira/browse/KAFKA-17571 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 4.0.0 >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > Fix For: 4.0.0 > > > Revert https://github.com/apache/kafka/pull/17219 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17109: Move lock backoff retry to streams TaskManager [kafka]
cadonna commented on PR #17209: URL: https://github.com/apache/kafka/pull/17209#issuecomment-2376116982 @aliehsaeedii the following test fails consistently with a NPE with this PR: `StreamThreadTest.shouldRecordCommitLatency()`. See https://github.com/apache/kafka/actions/runs/11030675352?pr=17209 Could you have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17584) Fix incorrect synonym handling for dynamic log configurations
[ https://issues.apache.org/jira/browse/KAFKA-17584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-17584. --- Fix Version/s: 3.9.0 3.8.1 Resolution: Fixed > Fix incorrect synonym handling for dynamic log configurations > - > > Key: KAFKA-17584 > URL: https://issues.apache.org/jira/browse/KAFKA-17584 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.9.0 >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.9.0, 3.8.1 > > > Updating certain dynamic configurations (for example `message.max.bytes`) > causes retention based on time to reset to the default value (source code) > for log.retention.ms. This poses a durability issue if users have set their > retention by using log.retention.hours or log.retention.minutes. In other > words, if a user has set log.retention.hours=-1 (infinite retention) and they > dynamically change `message.max.bytes` their retention will immediately > change back to the default of 60480 ms (7 days) and data before this will > be scheduled for deletion immediately. > Steps to reproduce: > 1. Add log.retention.minutes=1,log.retention.check.interval.ms=1000 to > server.properties > 2. Start a single ZK or KRaft instance + a single Kafka instance > 3. Create a topic using > {code:java} > bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic A > --replication-factor 1 --partitions 1 --config min.insync.replicas=1 --config > segment.bytes=512{code} > 4. Create a few segments with the console producer > 5. Observe that they are deleted after 1 minute > 6. Use the following command > {code:java} > bin/kafka-configs.sh --bootstrap-server loclahost:9092 --entity-type brokers > --entity-default --alter --add-config message.max.bytes=1048609{code} > (the value of `message.max.bytes` is irrelevant) > 7. Create a few more segments with the console producer > 8. Observe that segments are no longer deleted after 1 minute -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
smjn commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776535621 ## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/StateBatchUtilTest.java: ## @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.server.share.PersisterStateBatch; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class StateBatchUtilTest { +static class BatchTestHolder { +final String testName; +final List curList; +final List newList; +final List expectedResult; +final long startOffset; +final boolean shouldRun; + +BatchTestHolder(String testName, +List curList, +List newList, +List expectedResult, +long startOffset) { +this(testName, curList, newList, expectedResult, startOffset, true); +} + +BatchTestHolder(String testName, +List curList, +List newList, +List expectedResult, +long startOffset, +boolean shouldRun) { +this.testName = testName; +this.curList = curList; +this.newList = newList; +this.expectedResult = expectedResult; +this.startOffset = startOffset; +this.shouldRun = shouldRun; +} + +@Override +public String toString() { +return this.testName; +} +} + +@SuppressWarnings({"MethodLength"}) +private static Stream generator() { +return Stream.of( +new BatchTestHolder( +"Current batches with start offset midway are pruned.", +Collections.singletonList( +new PersisterStateBatch(100, 130, (byte) 0, (short) 1) +), +Collections.emptyList(), +Collections.singletonList( +new PersisterStateBatch(120, 130, (byte) 0, (short) 1) +), +120 +), + +new BatchTestHolder( +"New batches with start offset midway are pruned.", +Collections.emptyList(), +Collections.singletonList( +new PersisterStateBatch(100, 130, (byte) 0, (short) 1) +), +Collections.singletonList( +new PersisterStateBatch(120, 130, (byte) 0, (short) 1) +), +120 +), + +new BatchTestHolder( +"Both current and new batches empty.", +Collections.emptyList(), +Collections.emptyList(), +Collections.emptyList(), +120 +), + +// same state +new BatchTestHolder( +"Same state. Last and candidate have same first and last offset.", +Collections.singletonList( +new PersisterStateBatch(100, 110, (byte) 0, (short) 1) +), +Collections.singletonList( +new PersisterStateBatch(100, 110, (byte) 0, (short) 1) +), +Collections.singletonList( +new PersisterStateBatch(100, 110, (byte) 0, (short) 1) +), +-1 +), + +new BatchTestHolder( +"Same state. Last and candidate have same first offset, candidate last offset strictly smaller.", +Collections.singletonList( +new PersisterStateBatch(100, 110, (byte) 0, (short) 1) +), +Collections.singletonList( +new PersisterStateBatch(100, 105, (byte) 0, (short) 1) +), +
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
smjn commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776452137 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java: ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.share.PersisterStateBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; + +public class StateBatchUtil { +/** + * Util method which takes in 2 lists containing {@link PersisterStateBatch} + * and the startOffset. + * This method removes any batches where the lastOffset < startOffset, if the startOffset > -1. + * It then merges any contiguous intervals with same state. If states differ, + * based on various conditions it creates new non-overlapping batches preferring new ones. + * @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch} + * @param newBatches - List containing {@link PersisterStateBatch} in incoming request + * @param startOffset - startOffset to consider when removing old batches. + * @return List containing combined batches + */ +public static List combineStateBatches( +List batchesSoFar, +List newBatches, +long startOffset +) { +List combinedList = new ArrayList<>(batchesSoFar.size() + newBatches.size()); +combinedList.addAll(batchesSoFar); +combinedList.addAll(newBatches); + +return mergeBatches( +pruneBatches( +combinedList, +startOffset +) +); +} + +/** + * Encapsulates the main merge algorithm. Consider 2 batches (A, B): + * - Same state (delivery count and state) + * - If overlapping - merge into single batch + * - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches into a single 1 + * - Different state (delivery count or state differ) + * - Based on various cases: + * - swallow lower priority batch within bounds of offsets + * - break batch into other non-overlapping batches + * @param batches - List of {@link PersisterStateBatch} + * @return List of non-overlapping {@link PersisterStateBatch} + */ +private static List mergeBatches(List batches) { +if (batches.size() < 2) { +return batches; +} +TreeSet sortedBatches = new TreeSet<>(batches); +List finalBatches = new ArrayList<>(batches.size() * 2); // heuristic size + +BatchOverlapState overlapState = getOverlappingState(sortedBatches); + +while (overlapState != BatchOverlapState.SENTINEL) { Review Comment: No the logic is not that simple. A single iteration over the sorted batches might not be enough. The invariant is that the batches remain sorted even after manipulation Consider: ``` A [1,10,0,1] --- B [5,7,0,2] -- C [5,15,0,3] ``` A and B will combine to ``` [1,4,0,1] [5,7,0,2] ---[8,10,0,1] ``` Now when combining with C, we have 2 previous batches to consider. Secondly, ``` A [1,10,0,1] --- B [5,7,0,2] --- C [5,7,0,3] ``` A and B will combine to ``` [1,4,0,1] [5,7,0,2] ---[8,10,0,1] --- <- C - we broke invariant for being sorted by batches ``` In the current impl, these situations are implicitly handled by virtue of the treeset. Any newly generated batches are pushed back into the treeset and the `getOverlappingState` method finds the first overlapping pair as well as returns the non-overlapping prefix. The non-overlapping prefix is then REMOVED from the treeset hence, once a batch is no longer overlapping, i
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
smjn commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776535621 ## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/StateBatchUtilTest.java: ## @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.server.share.PersisterStateBatch; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class StateBatchUtilTest { +static class BatchTestHolder { +final String testName; +final List curList; +final List newList; +final List expectedResult; +final long startOffset; +final boolean shouldRun; + +BatchTestHolder(String testName, +List curList, +List newList, +List expectedResult, +long startOffset) { +this(testName, curList, newList, expectedResult, startOffset, true); +} + +BatchTestHolder(String testName, +List curList, +List newList, +List expectedResult, +long startOffset, +boolean shouldRun) { +this.testName = testName; +this.curList = curList; +this.newList = newList; +this.expectedResult = expectedResult; +this.startOffset = startOffset; +this.shouldRun = shouldRun; +} + +@Override +public String toString() { +return this.testName; +} +} + +@SuppressWarnings({"MethodLength"}) +private static Stream generator() { +return Stream.of( +new BatchTestHolder( +"Current batches with start offset midway are pruned.", +Collections.singletonList( +new PersisterStateBatch(100, 130, (byte) 0, (short) 1) +), +Collections.emptyList(), +Collections.singletonList( +new PersisterStateBatch(120, 130, (byte) 0, (short) 1) +), +120 +), + +new BatchTestHolder( +"New batches with start offset midway are pruned.", +Collections.emptyList(), +Collections.singletonList( +new PersisterStateBatch(100, 130, (byte) 0, (short) 1) +), +Collections.singletonList( +new PersisterStateBatch(120, 130, (byte) 0, (short) 1) +), +120 +), + +new BatchTestHolder( +"Both current and new batches empty.", +Collections.emptyList(), +Collections.emptyList(), +Collections.emptyList(), +120 +), + +// same state +new BatchTestHolder( +"Same state. Last and candidate have same first and last offset.", +Collections.singletonList( +new PersisterStateBatch(100, 110, (byte) 0, (short) 1) +), +Collections.singletonList( +new PersisterStateBatch(100, 110, (byte) 0, (short) 1) +), +Collections.singletonList( +new PersisterStateBatch(100, 110, (byte) 0, (short) 1) +), +-1 +), + +new BatchTestHolder( +"Same state. Last and candidate have same first offset, candidate last offset strictly smaller.", +Collections.singletonList( +new PersisterStateBatch(100, 110, (byte) 0, (short) 1) +), +Collections.singletonList( +new PersisterStateBatch(100, 105, (byte) 0, (short) 1) +), +
[jira] [Updated] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout
[ https://issues.apache.org/jira/browse/KAFKA-17618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PoAn Yang updated KAFKA-17618: -- Labels: kip-848 (was: ) > group consumer heartbeat interval should be less than session timeout > - > > Key: KAFKA-17618 > URL: https://issues.apache.org/jira/browse/KAFKA-17618 > Project: Kafka > Issue Type: Task >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > Labels: kip-848 > > [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session] > mentions: > bq. The member is expected to heartbeat every > group.consumer.heartbeat.interval.ms in order to keep its session opened. If > it does not heartbeat at least once within the > group.consumer.session.timeout.ms, the group coordinator will kick the member > out from the group. > To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than > _group.consumer.session.timeout.ms_, we can add validation for it. > We can do similar validation for _group.share.heartbeat.interval.ms_ and > _group.share.session.timeout.ms_ as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout
PoAn Yang created KAFKA-17618: - Summary: group consumer heartbeat interval should be less than session timeout Key: KAFKA-17618 URL: https://issues.apache.org/jira/browse/KAFKA-17618 Project: Kafka Issue Type: Task Reporter: PoAn Yang Assignee: PoAn Yang [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session] mentions: bq. The member is expected to heartbeat every group.consumer.heartbeat.interval.ms in order to keep its session opened. If it does not heartbeat at least once within the group.consumer.session.timeout.ms, the group coordinator will kick the member out from the group. To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than _group.consumer.session.timeout.ms_, we can add validation for it. We can do similar validation for _group.share.heartbeat.interval.ms_ and _group.share.session.timeout.ms_ as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15859: Add timeout field to the ListOffsets request [kafka]
mumrah commented on PR #17112: URL: https://github.com/apache/kafka/pull/17112#issuecomment-2377323567 @satishd @showuon this PR broke some tests and has caused trunk builds to fail. The latest CI run for this PR was https://github.com/apache/kafka/actions/runs/11027485627 which shows both of the JUnit steps failing. Unlike with Jenkins, we need to look at _every_ failed test build -- especially before merging. Looking at these two failed jobs, we can see we have actual failed tests and not just flaky ones https://github.com/user-attachments/assets/f02251bc-5dff-4d22-92e1-b4e01a5153fe";> https://github.com/apache/kafka/actions/runs/11027485627/job/30626323008 We want to see all the status checks in the PR to be green before merging. I would like to have branch protections in place to prevent this sort of regression, but until we sort out the flaky tests we can't really. Anyways, I'm not meaning to pick on this PR -- just trying to raise awareness of our "new normal" for build expectations :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-17617) New GitHub Actions build builds Java 8 with 2.13 instead of 2.12
[ https://issues.apache.org/jira/browse/KAFKA-17617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur reassigned KAFKA-17617: Assignee: David Arthur > New GitHub Actions build builds Java 8 with 2.13 instead of 2.12 > > > Key: KAFKA-17617 > URL: https://issues.apache.org/jira/browse/KAFKA-17617 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Assignee: David Arthur >Priority: Minor > Fix For: 3.7.2, 3.8.1, 3.9.1 > > > I noticed a PR that failed for Jenkins but not for the GitHub Actions build. > Tracing it down, it looks like Jenkins was using Scala 2.12 and GitHub > actions is using 2.13. > We still should support 2.12, so we should fix the GitHub actions now that > Jenkins is removed. Until we fix, folks can merge in code that breaks 2.12 > builds. > See Jenkins for > failure:[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-17093/5/cloudbees-pipeline-explorer/?filter=37] > see for success in GH Actions: > [https://github.com/apache/kafka/actions/runs/11039342155/job/30672276597?pr=17093] > see raw build for 2.13: > [https://productionresultssa2.blob.core.windows.net/actions-results/4f08a774-9253-4d15-8617-a627e9961b76/workflow-job-run-716dd2a8-3073-58cf-0d26-4a389b46b592/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-26T01%3A05%3A04Z&sig=klIXSOTwKN9WrBvtdsN6j45DbSqg7ikwow%2FGETJy5pc%3D&ske=2024-09-26T12%3A26%3A46Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-26T00%3A26%3A46Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-26T00%3A54%3A59Z&sv=2024-05-04] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] HOTFIX: fix failed cases in FeatureCommandTest [kafka]
kamalcph commented on PR #17287: URL: https://github.com/apache/kafka/pull/17287#issuecomment-2377344257 Thanks @FrankYang0529 for fixing this test! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15859: Add timeout field to the ListOffsets request [kafka]
kamalcph commented on PR #17112: URL: https://github.com/apache/kafka/pull/17112#issuecomment-2377346567 The failed test are fixed in #17287 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17621; Reduce logging verbosity on ConsumerGroupHeartbeat path [kafka]
dajac commented on code in PR #17288: URL: https://github.com/apache/kafka/pull/17288#discussion_r1777317218 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1984,8 +1986,10 @@ private CoordinatorResult classicGroupJoinToConsumerGro ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId {}] Computed new subscription metadata: {}.", -groupId, subscriptionMetadata); +if (log.isDebugEnabled()) { +log.info("[GroupId {}] Computed new subscription metadata: {}.", Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17621; Reduce logging verbosity on ConsumerGroupHeartbeat path [kafka]
dajac commented on code in PR #17288: URL: https://github.com/apache/kafka/pull/17288#discussion_r1777312193 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1984,8 +1986,10 @@ private CoordinatorResult classicGroupJoinToConsumerGro ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId {}] Computed new subscription metadata: {}.", -groupId, subscriptionMetadata); +if (log.isDebugEnabled()) { +log.info("[GroupId {}] Computed new subscription metadata: {}.", Review Comment: It should be debug too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: fix failed cases in FeatureCommandTest [kafka]
mumrah merged PR #17287: URL: https://github.com/apache/kafka/pull/17287 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17078: Add SecurityManagerCompatibility shim [kafka]
gharris1727 commented on PR #16522: URL: https://github.com/apache/kafka/pull/16522#issuecomment-2377340024 Hey @danishnawab thanks for reminding us. Since this may only be included in future releases (3.7.2, 3.8.1, 3.9.1, 4.0.0), existing releases you can use the workaround of setting the system property `java.security.manager` to `allow`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]
lianetm merged PR #16982: URL: https://github.com/apache/kafka/pull/16982 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
[ https://issues.apache.org/jira/browse/KAFKA-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17623: -- Component/s: clients > Flaky > testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback > > > Key: KAFKA-17623 > URL: https://issues.apache.org/jira/browse/KAFKA-17623 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor > > Flaky for the new consumer, failing with : > org.apache.kafka.common.KafkaException: User rebalance callback throws an > error at > app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618) > ... > Caused by: java.lang.IllegalStateException: No current assignment for > partition topic-0 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171) > > Flaky behaviour: > > https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14562 [1/2]: Implement epoch bump after every transaction [kafka]
jolshan merged PR #16719: URL: https://github.com/apache/kafka/pull/16719 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid topics [kafka]
kirktrue commented on code in PR #17244: URL: https://github.com/apache/kafka/pull/17244#discussion_r1777422783 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1279,7 +1281,7 @@ private void releaseAssignmentAndLeaveGroup(final Timer timer) { UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); applicationEventHandler.add(unsubscribeEvent); try { -processBackgroundEvents(unsubscribeEvent.future(), timer); +processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException); Review Comment: Is there a chance that the `InvalidTopicException` could be nested inside another exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll [kafka]
FrankYang0529 commented on code in PR #17165: URL: https://github.com/apache/kafka/pull/17165#discussion_r1776908662 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ## @@ -458,14 +466,24 @@ String memberIdInfoForLog() { } /** - * Join the group with the updated subscription, if the member is not part of it yet. If the - * member is already part of the group, this will only ensure that the updated subscription + * Set {@link #subscriptionUpdated} to true to indicate that the subscription has been updated. + * The next {@link #maybeJoinGroup()} will join the group with the updated subscription, if the member is not part of it yet. + * If the member is already part of the group, this will only ensure that the updated subscription * is included in the next heartbeat request. * * Note that list of topics of the subscription is taken from the shared subscription state. */ public void onSubscriptionUpdated() { -if (state == MemberState.UNSUBSCRIBED) { +subscriptionUpdated.compareAndSet(false, true); +} + +/** + * Join the group if the member is not part of it yet. This function separates {@link #transitionToJoining} + * from the {@link #onSubscriptionUpdated} to fulfill the requirement of the "rebalances will only occur during an + * active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}" + */ +public void maybeJoinGroup() { Review Comment: Thanks for the suggestion. I updated PR description and change `maybeJoinGroup` to `onConsumerPoll`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16733: Add share group record support to OffsetsMessageParser [kafka]
AndrewJSchofield opened a new pull request, #17282: URL: https://github.com/apache/kafka/pull/17282 KIP-932 added a bunch of new record schemas to the consumer offsets topic which need to be added to the OffsetsMessageParser in kafka-dump-log.sh. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change
[ https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885016#comment-17885016 ] Andras Hatvani commented on KAFKA-16394: [~mjsax] Any update on the fix and release of this issue? I'm currently stuck on 3.6.1 in a project with dozens of modules due to this error. > ForeignKey LEFT join propagates null value on foreignKey change > --- > > Key: KAFKA-16394 > URL: https://issues.apache.org/jira/browse/KAFKA-16394 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Ayoub Omari >Assignee: Ayoub Omari >Priority: Major > Attachments: ForeignJoinTest.scala, JsonSerde.scala > > > We have two topics : _left-topic[String, LeftRecord]_ and > _right-topic[String, String]_ > where _LeftRecord_ : > {code:scala} > case class LeftRecord(foreignKey: String, name: String){code} > we do a simple *LEFT* foreign key join on left-topic's foreignKey field. The > resulting join value is the value in right-topic. > > +*Scenario1: change foreignKey*+ > Input the following > {code:scala} > rightTopic.pipeInput("fk1", "1") > rightTopic.pipeInput("fk2", "2") > leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) > leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1")) > {code} > > *+Expected result+* > {code:scala} > KeyValue(pk1, 1) > KeyValue(pk1, 2){code} > > *+Actual result+* > {code:scala} > KeyValue(pk1, 1) > KeyValue(pk1, null) > KeyValue(pk1, 2){code} > > A null is propagated to the join result when the foreign key changes > > +*Scenario 2: Delete PrimaryKey*+ > Input > {code:scala} > rightTopic.pipeInput("fk1", "1") > rightTopic.pipeInput("fk2", "2") > leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) > leftTopic.pipeInput("pk1", null) {code} > > *+Expected result+* > {code:scala} > KeyValue(pk1, 1) > KeyValue(pk1, null) {code} > > *+Actual result+* > {code:java} > KeyValue(pk1, 1) > KeyValue(pk1, null) > KeyValue(pk1, null) {code} > An additional null is propagated to the join result. > > This bug doesn't exist on versions 3.6.0 and below. > > I believe the issue comes from the line > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134] > where we propagate the deletion in the two scenarios above > > Attaching the topology I used. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-17619: Remove zk type and instance from ClusterTest [kafka]
FrankYang0529 opened a new pull request, #17284: URL: https://github.com/apache/kafka/pull/17284 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
AndrewJSchofield commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1777017514 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java: ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.share.PersisterStateBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; + +public class StateBatchUtil { +/** + * Util method which takes in 2 lists containing {@link PersisterStateBatch} + * and the startOffset. + * This method removes any batches where the lastOffset < startOffset, if the startOffset > -1. + * It then merges any contiguous intervals with same state. If states differ, + * based on various conditions it creates new non-overlapping batches preferring new ones. + * @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch} + * @param newBatches - List containing {@link PersisterStateBatch} in incoming request + * @param startOffset - startOffset to consider when removing old batches. + * @return List containing combined batches + */ +public static List combineStateBatches( +List batchesSoFar, +List newBatches, +long startOffset +) { +List combinedList = new ArrayList<>(batchesSoFar.size() + newBatches.size()); +combinedList.addAll(batchesSoFar); +combinedList.addAll(newBatches); + +return mergeBatches( +pruneBatches( +combinedList, +startOffset +) +); +} + +/** + * Encapsulates the main merge algorithm. Consider 2 batches (A, B): + * - Same state (delivery count and state) + * - If overlapping - merge into single batch + * - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches into a single 1 + * - Different state (delivery count or state differ) + * - Based on various cases: + * - swallow lower priority batch within bounds of offsets + * - break batch into other non-overlapping batches + * @param batches - List of {@link PersisterStateBatch} + * @return List of non-overlapping {@link PersisterStateBatch} + */ +private static List mergeBatches(List batches) { +if (batches.size() < 2) { +return batches; +} +TreeSet sortedBatches = new TreeSet<>(batches); +List finalBatches = new ArrayList<>(batches.size() * 2); // heuristic size + +BatchOverlapState overlapState = getOverlappingState(sortedBatches); + +while (overlapState != BatchOverlapState.SENTINEL) { +PersisterStateBatch last = overlapState.last(); +PersisterStateBatch candidate = overlapState.candidate(); + +// remove non overlapping prefix from sortedBatches, +// will make getting next overlapping pair efficient +// as a prefix batch which is non overlapping will only +// be checked once. +if (overlapState.nonOverlapping() != null) { +overlapState.nonOverlapping().forEach(sortedBatches::remove); +finalBatches.addAll(overlapState.nonOverlapping()); +} + +if (candidate == null) { +overlapState = BatchOverlapState.SENTINEL; +continue; +} + +// remove both last and candidate for easier +// assessment about adding batches to sortedBatches +sortedBatches.remove(last); +sortedBatches.remove(candidate); + +// overlap and same state (last.firstOffset <= candidate.firstOffset) due to sort +// covers: +// case:12 34 5 6 7 (contiguous) +// last:__ ______ ___ ___ ___ +// candi
[PR] MINOR: Cache topic resolution in TopicIds set [kafka]
squah-confluent opened a new pull request, #17285: URL: https://github.com/apache/kafka/pull/17285 Looking up topics in a TopicsImage is relatively slow. Cache the results in TopicIds to improve assignor performance. In benchmarks, we see a noticeable improvement in performance in the heterogeneous case. Before ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode CntScore Error Units ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1 10 HOMOGENEOUS 1000 avgt5 36.400 ± 3.004 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1 10 HETEROGENEOUS 1000 avgt5 158.340 ± 0.825 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 1 10 HOMOGENEOUS 1000 avgt51.329 ± 0.041 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 1 10 HETEROGENEOUS 1000 avgt5 382.901 ± 6.203 ms/op ``` After ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode CntScore Error Units ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1 10 HOMOGENEOUS 1000 avgt5 36.465 ± 1.954 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1 10 HETEROGENEOUS 1000 avgt5 114.043 ± 1.424 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 1 10 HOMOGENEOUS 1000 avgt51.454 ± 0.019 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 1 10 HETEROGENEOUS 1000 avgt5 342.840 ± 2.744 ms/op ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17624) Remove the E2E uses of accessing ACLs from zk
Chia-Ping Tsai created KAFKA-17624: -- Summary: Remove the E2E uses of accessing ACLs from zk Key: KAFKA-17624 URL: https://issues.apache.org/jira/browse/KAFKA-17624 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai To remove ZooKeeper code from AclCommand, we first need to remove the related system tests. This task should remove zookeeper.py#list_acls and its usages, including zookeeper_tls_test.py, zookeeper_tls_encrypt_only_test.py, zookeeper_security_upgrade_test.py, `test_rolling_upgrade_phase_two`, `test_rolling_upgrade_sasl_mechanism_phase_two`, and upgrade_test.py#perform_upgrade. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17624) Remove the E2E uses of accessing ACLs from zk
[ https://issues.apache.org/jira/browse/KAFKA-17624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885050#comment-17885050 ] TengYao Chi commented on KAFKA-17624: - Hi [~chia7712] If you are not start working on this issue, I would like to give it a try.:) > Remove the E2E uses of accessing ACLs from zk > - > > Key: KAFKA-17624 > URL: https://issues.apache.org/jira/browse/KAFKA-17624 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > To remove ZooKeeper code from AclCommand, we first need to remove the related > system tests. This task should remove zookeeper.py#list_acls and its usages, > including zookeeper_tls_test.py, zookeeper_tls_encrypt_only_test.py, > zookeeper_security_upgrade_test.py, `test_rolling_upgrade_phase_two`, > `test_rolling_upgrade_sasl_mechanism_phase_two`, and > upgrade_test.py#perform_upgrade. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: fix failed cases in FeatureCommandTest [kafka]
FrankYang0529 opened a new pull request, #17287: URL: https://github.com/apache/kafka/pull/17287 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17317: Validate and maybe trigger downgrade after static member replacement [kafka]
dongnuo123 commented on PR #17008: URL: https://github.com/apache/kafka/pull/17008#issuecomment-2377169743 Reopened https://github.com/apache/kafka/pull/17286 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17540: Create floating tag on trunk for CI cache [kafka]
mumrah commented on code in PR #17204: URL: https://github.com/apache/kafka/pull/17204#discussion_r1777223177 ## committer-tools/update-cache.sh: ## @@ -0,0 +1,35 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +key=$( + gh cache list \ +--key 'gradle-home-v1|Linux-X64|test' \ +--sort 'created_at' \ +--limit 1 \ +--json 'key' \ +--jq '.[].key' +) + +cut -d '-' -f 5 <<< "$key" + +if ! git config --get alias.update-cache >/dev/null; then + this_path=$(realpath "$0") + git config alias.update-cache "!bash $this_path" + echo + echo "Now, you can use 'git update-cache' as alias to execute this script." +fi Review Comment: Although I like this feature, it's probably more polite to let committer manually install the script as an alias rather than automatically modifying their git config. Instead of auto installing it, have this if statement print instructions for adding the alias. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17540: Create floating tag on trunk for CI cache [kafka]
mumrah commented on PR #17204: URL: https://github.com/apache/kafka/pull/17204#issuecomment-2377207907 > Maybe, we keep it more agnostic and print a message guiding the user to fetch. WDYT? Good point on different remote names. Let's still attempt to update the ref in this script. If it can't be done, print a warning and recommend to the user to update their remote. ``` > git update-cache Cannot update 'trunk-cached' because SHA 264131cdaaef3f4696942f26534b3f61f3a2a162 does not exist locally. Please update your remote and try again. > echo $? 1 > git fetch origin-or-whatever > git update-cache Local branch 'trunk-cached' updated to 264131cdaaef3f4696942f26534b3f61f3a2a162 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17617) New GitHub Actions build builds Java 8 with 2.13 instead of 2.12
[ https://issues.apache.org/jira/browse/KAFKA-17617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-17617: - Fix Version/s: 3.9.1 > New GitHub Actions build builds Java 8 with 2.13 instead of 2.12 > > > Key: KAFKA-17617 > URL: https://issues.apache.org/jira/browse/KAFKA-17617 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Critical > Fix For: 3.9.1 > > > I noticed a PR that failed for Jenkins but not for the GitHub Actions build. > Tracing it down, it looks like Jenkins was using Scala 2.12 and GitHub > actions is using 2.13. > We still should support 2.12, so we should fix the GitHub actions now that > Jenkins is removed. Until we fix, folks can merge in code that breaks 2.12 > builds. > See Jenkins for > failure:[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-17093/5/cloudbees-pipeline-explorer/?filter=37] > see for success in GH Actions: > [https://github.com/apache/kafka/actions/runs/11039342155/job/30672276597?pr=17093] > see raw build for 2.13: > [https://productionresultssa2.blob.core.windows.net/actions-results/4f08a774-9253-4d15-8617-a627e9961b76/workflow-job-run-716dd2a8-3073-58cf-0d26-4a389b46b592/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-26T01%3A05%3A04Z&sig=klIXSOTwKN9WrBvtdsN6j45DbSqg7ikwow%2FGETJy5pc%3D&ske=2024-09-26T12%3A26%3A46Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-26T00%3A26%3A46Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-26T00%3A54%3A59Z&sv=2024-05-04] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17617) New GitHub Actions build builds Java 8 with 2.13 instead of 2.12
[ https://issues.apache.org/jira/browse/KAFKA-17617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-17617: - Priority: Minor (was: Critical) > New GitHub Actions build builds Java 8 with 2.13 instead of 2.12 > > > Key: KAFKA-17617 > URL: https://issues.apache.org/jira/browse/KAFKA-17617 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Minor > Fix For: 3.9.1 > > > I noticed a PR that failed for Jenkins but not for the GitHub Actions build. > Tracing it down, it looks like Jenkins was using Scala 2.12 and GitHub > actions is using 2.13. > We still should support 2.12, so we should fix the GitHub actions now that > Jenkins is removed. Until we fix, folks can merge in code that breaks 2.12 > builds. > See Jenkins for > failure:[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-17093/5/cloudbees-pipeline-explorer/?filter=37] > see for success in GH Actions: > [https://github.com/apache/kafka/actions/runs/11039342155/job/30672276597?pr=17093] > see raw build for 2.13: > [https://productionresultssa2.blob.core.windows.net/actions-results/4f08a774-9253-4d15-8617-a627e9961b76/workflow-job-run-716dd2a8-3073-58cf-0d26-4a389b46b592/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-26T01%3A05%3A04Z&sig=klIXSOTwKN9WrBvtdsN6j45DbSqg7ikwow%2FGETJy5pc%3D&ske=2024-09-26T12%3A26%3A46Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-26T00%3A26%3A46Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-26T00%3A54%3A59Z&sv=2024-05-04] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17617) New GitHub Actions build builds Java 8 with 2.13 instead of 2.12
[ https://issues.apache.org/jira/browse/KAFKA-17617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-17617: - Fix Version/s: 3.7.2 3.8.1 > New GitHub Actions build builds Java 8 with 2.13 instead of 2.12 > > > Key: KAFKA-17617 > URL: https://issues.apache.org/jira/browse/KAFKA-17617 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Minor > Fix For: 3.7.2, 3.8.1, 3.9.1 > > > I noticed a PR that failed for Jenkins but not for the GitHub Actions build. > Tracing it down, it looks like Jenkins was using Scala 2.12 and GitHub > actions is using 2.13. > We still should support 2.12, so we should fix the GitHub actions now that > Jenkins is removed. Until we fix, folks can merge in code that breaks 2.12 > builds. > See Jenkins for > failure:[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-17093/5/cloudbees-pipeline-explorer/?filter=37] > see for success in GH Actions: > [https://github.com/apache/kafka/actions/runs/11039342155/job/30672276597?pr=17093] > see raw build for 2.13: > [https://productionresultssa2.blob.core.windows.net/actions-results/4f08a774-9253-4d15-8617-a627e9961b76/workflow-job-run-716dd2a8-3073-58cf-0d26-4a389b46b592/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-26T01%3A05%3A04Z&sig=klIXSOTwKN9WrBvtdsN6j45DbSqg7ikwow%2FGETJy5pc%3D&ske=2024-09-26T12%3A26%3A46Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-26T00%3A26%3A46Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-26T00%3A54%3A59Z&sv=2024-05-04] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid topics [kafka]
lianetm commented on code in PR #17244: URL: https://github.com/apache/kafka/pull/17244#discussion_r1777270499 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala: ## @@ -244,6 +245,9 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.") assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage) +assertDoesNotThrow(new Executable { Review Comment: I like the idea of including the unsubscribe in the test, but with this we loose the coverage we initially had for subscribe(invalid) + close (if there's a regression and releaseAssignment stops ignoring the error we won't catch it). What about having 2 tests, mostly doing the same intially, but then one calls unsubscribe and asserts does not throw (like we have now), and the other one calls close explicitely and asserts the same, what do you think? (we could reuse most of the code for the 2 tests) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: fix failed cases in FeatureCommandTest [kafka]
mumrah commented on PR #17287: URL: https://github.com/apache/kafka/pull/17287#issuecomment-2377227125 Looks like the tests were failing on the PR, so this should not have been merged. https://github.com/apache/kafka/actions/runs/11027485627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: fix failed cases in FeatureCommandTest [kafka]
mumrah commented on PR #17287: URL: https://github.com/apache/kafka/pull/17287#issuecomment-2377232171 @chia7712 I agree we should eventually have some "acceptance" tests which do some high level sanity checks. In this case, a change to MetadataVersion (located in server-common) will cause pretty much all tests to run except maybe client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]
wrwksexahatvani commented on PR #15607: URL: https://github.com/apache/kafka/pull/15607#issuecomment-2376945505 @mjsax @wcarlson5 @gongxuanzhang This is a major blocker as it leads to data loss as happened in dozens of my projects. When will the fix for this be integrated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17480: New consumer commit all consumed should retrieve offsets in background thread [kafka]
lianetm commented on code in PR #17150: URL: https://github.com/apache/kafka/pull/17150#discussion_r1777058782 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -162,23 +163,44 @@ private void process(final PollEvent event) { } private void process(final AsyncCommitEvent event) { -if (!requestManagers.commitRequestManager.isPresent()) { -return; -} - -CommitRequestManager manager = requestManagers.commitRequestManager.get(); -CompletableFuture future = manager.commitAsync(event.offsets()); -future.whenComplete(complete(event.future())); +process((CommitEvent) event); Review Comment: We have 2 separate event types (async and sync), to then join them together in one here, to then split them again for the actual process with: ``` if (event.type() == Type.COMMIT_ASYNC) { future = manager.commitAsync(offsets); } else { future = manager.commitSync(offsets, event.deadlineMs()); } ``` I get that with this we can reuse a bit but wonder if it's worth the twisted flow. Could we maybe keep them separate (as they originally are when the events are created), then process(Sync) that ends up calling the mgr.commitSync, and process(Async) calling manager.commitAsync, and just encapsulate in funcs what we want to reuse in both? (ex. maybeUpdateLastSeenEpochIfNewer() with lines 188-191 that would be called from both, similar for the logic to retrieve offsets from the event, ln 180-181). What do you think? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ## @@ -208,6 +206,110 @@ public void testSeekUnvalidatedEventWithException() { assertInstanceOf(IllegalStateException.class, e.getCause()); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testSyncCommitEventWithOffsets(boolean withGroupId) { Review Comment: since this is testing commit it does need a group id, so it should be only for withGroupId=true I expect ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ## @@ -208,6 +206,110 @@ public void testSeekUnvalidatedEventWithException() { assertInstanceOf(IllegalStateException.class, e.getCause()); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testSyncCommitEventWithOffsets(boolean withGroupId) { +final long deadlineMs = 12345; +TopicPartition tp = new TopicPartition("topic", 0); +Map offsets = Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null)); +SyncCommitEvent event = new SyncCommitEvent(offsets, false, deadlineMs); + +setupProcessor(withGroupId); +if (withGroupId) { +doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1); + doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets, deadlineMs); +} + +processor.process(event); +verify(subscriptionState, never()).allConsumed(); +if (withGroupId) { +verify(metadata).updateLastSeenEpochIfNewer(tp, 1); +verify(commitRequestManager).commitSync(offsets, deadlineMs); +} else { +verify(metadata, never()).updateLastSeenEpochIfNewer(tp, 1); +verify(commitRequestManager, never()).commitSync(offsets, deadlineMs); +} +assertDoesNotThrow(() -> event.future().get()); +} + +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testSyncCommitEventWithCommitAllConsumed(boolean withGroupId) { Review Comment: same, only relevant for withGroupId=true right? (and the all the other commit tests down below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17488: Cleanup (test) code for Kafka Streams "metric version" [kafka]
fonsdant commented on PR #17182: URL: https://github.com/apache/kafka/pull/17182#issuecomment-2376967494 Accidentally, I must have reverted the removal of the `builtInMetricsVersion` passing argument to `StreamsMetricsImpl` in `TopologyTestDriver`, which has caused the build to fail. The last commit fixed it. Also, I have removed the others `builtInMetricsVersion` values that had not already been removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17622) Kafka Streams Timeout During Partition Rebalance
[ https://issues.apache.org/jira/browse/KAFKA-17622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alieh Saeedi updated KAFKA-17622: - Summary: Kafka Streams Timeout During Partition Rebalance (was: Kafka Streams Timeout During Partition Rebalance - Seeking Insights on NotLeaderOrFollowerException) > Kafka Streams Timeout During Partition Rebalance > - > > Key: KAFKA-17622 > URL: https://issues.apache.org/jira/browse/KAFKA-17622 > Project: Kafka > Issue Type: Bug >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > > Re: > [https://forum.confluent.io/t/kafka-streams-timeout-during-partition-rebalance-seeking-insights-on-notleaderorfollowerexception/11362] > Calling {{{}Consumer.position() from KS{}}}treams for computing the offset > that must be committed suffers from a race condition so that by the time we > want to commit, the position may be gone. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17582) Unpredictable consumer position after transaction abort
[ https://issues.apache.org/jira/browse/KAFKA-17582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885032#comment-17885032 ] Kyle Kingsbury commented on KAFKA-17582: I've done some more digging here, and written a Jepsen test specifically for rewind-vs-advance behavior: [https://github.com/jepsen-io/redpanda/blob/main/src/jepsen/redpanda/workload/abort.clj]. Try something like: {{lein run test --db kafka -w abort --safe --concurrency 5n --sub-via assign --rate 1000 --time-limit 120}} I don't have clear answers yet, but it seems like this behavior is sensitive to whether you do the all writes in advance and only perform read-only transactions, or if you mix writes into the reading transactions as well. I also suspect that maybe advancing *is* the default behavior (uh oh) and rewinding is actually a consequence of a rebalance. > Unpredictable consumer position after transaction abort > --- > > Key: KAFKA-17582 > URL: https://issues.apache.org/jira/browse/KAFKA-17582 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, documentation >Affects Versions: 3.8.0 >Reporter: Kyle Kingsbury >Priority: Critical > Labels: abort, offset, transaction > Attachments: 20240919T124411.740-0500(1).zip, Screenshot from > 2024-09-19 18-45-34.png > > > With the official Kafka Java client, version 3.8.0, the position of consumers > after a transaction aborts appears unpredictable. Sometimes the consumer > moves on, skipping over the records it polled in the aborted transaction. > Sometimes it rewinds to read them again. Sometimes it rewinds *further* than > the most recent transaction. > Since the goal of transactions is to enable "exactly-once semantics", it > seems sensible that the consumer should rewind on abort, such that any > subsequent transactions would start at the same offsets. Not rewinding leads > to data loss, since messages are consumed but their effects are not > committed. Rewinding too far is... just weird. > I'm seeing this issue in Jepsen tests of Kafka 3.0.0 and other > Kafka-compatible systems. It occurs without faults, and with a single > producer and consumer; no other concurrent processes. Here's the producer and > consumer config: > > {{{}Producer config: {"socket.connection.setup.timeout.max.ms" 1000, > "transactional.id" "jt1", "bootstrap.servers" "n3:9092", "request.timeout.ms" > 3000, "enable.idempotence" true, "max.block.ms" 1, "value.serializer" > "org.apache.kafka.common.serialization.LongSerializer", "retries" 1000, > "key.serializer" "org.apache.kafka.common.serialization.LongSerializer", > "socket.connection.setup.timeout.ms" 500, "reconnect.backoff.max.ms" 1000, > "delivery.timeout.ms" 1, "acks" "all", "transaction.timeout.ms" 1000{ > {{{}Consumer config: {"socket.connection.setup.timeout.max.ms" 1000, > "bootstrap.servers" "n5:9092", "request.timeout.ms" 1, > "connections.max.idle.ms" 6, "session.timeout.ms" 6000, > "heartbeat.interval.ms" 300, "key.deserializer" > "org.apache.kafka.common.serialization.LongDeserializer", "group.id" > "jepsen-group", "metadata.max.age.ms" 6, "auto.offset.reset" "earliest", > "isolation.level" "read_committed", "socket.connection.setup.timeout.ms" 500, > "value.deserializer" > "org.apache.kafka.common.serialization.LongDeserializer", > "enable.auto.commit" false, "default.api.timeout.ms" 1{ > > Attached is a test run that shows this behavior, as well as a visualization > of the reads (polls) and writes (sends) of a single topic-partition. > In this plot, time flows down, and offsets run left to right. Each > transaction is a single horizontal line. `w1` denotes a send of value 1, and > `r2` denotes a poll of read 2. All operations here are performed by the sole > process in the system, which has a single Kafka consumer and a single Kafka > client. First, a transaction writes 35 and commits. Second, a transaction > reads 35 and aborts. Third, a transaction reads 35 and aborts: the consumer > has clearly re-wound to show the same record twice. > Then a transaction writes 37. Immediately thereafter a transaction reads 37 > and 38. Unlike before, it did *not* rewind. This transaction also aborts. > Finally, a transaction writes 39 and 40. Then a transaction reads 39 and 40. > This transaction commits! Values 35, 37, and 38 have been lost! > It doesn't seem possible that this is the effect of a consumer rebalance: > rebalancing should start off the consumer at the last *committed* offset, and > the last committed offset in this history was actually value 31–it should > have picked up at 35, 37, etc. This test uses auto.offset.reset=earliest, so > if the commit were somehow missing, it should have rewound to the
Re: [PR] KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid topics [kafka]
lianetm commented on PR #17244: URL: https://github.com/apache/kafka/pull/17244#issuecomment-2377319827 Regarding: > I haven't had a chance to dig into the root cause as I'm curious what can be done at that layer so that the consumer doesn't have to be aware of it. My thoughts in case it helps. As I understand it, the root cause is a combination of: 1. the consumer keeps requesting metadata for a topic (invalid) while "needed" (until it unsubscribes) -> this is common for both consumers, because it considers it can "recover" from it https://github.com/apache/kafka/blob/7c429f3514dd50e98f42c74595c35b5f0b32b7d7/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L643 (recover meaning no longer subscribed, so unsubscribed) 2. the legacy consumer does poll the network client in this test (no need to send leave because it's not in the group), but the async consumer does poll (background thread continuously polls the network client) So I think it makes sense to handle this at the unsubscribe level, where we understand why it's ok to ignore the exception (because it's actually the way to recover from it). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]
FrankYang0529 commented on code in PR #16982: URL: https://github.com/apache/kafka/pull/16982#discussion_r1776899718 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) { // poll once to update with the current metadata consumer.poll(Duration.ofMillis(0)); +TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), +"No metadata requests sent"); client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); // no error for no current position assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); -assertEquals(0, client.inFlightRequestCount()); - +if (groupProtocol == GroupProtocol.CLASSIC) { +// Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), +// different from the new async consumer, that will send the LIST_OFFSETS request in the background thread +// on the next background thread poll. +assertEquals(0, client.inFlightRequestCount()); +} // poll once again, which should send the list-offset request consumer.seek(tp0, 50L); consumer.poll(Duration.ofMillis(0)); // requests: list-offset, fetch -assertEquals(2, client.inFlightRequestCount()); +TestUtils.waitForCondition(() -> { +boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS); +boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH); +return hasListOffsetRequest && hasFetchRequest; +}, "No list-offset & fetch request sent"); // no error for no end offset (so unknown lag) assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); // poll once again, which should return the list-offset response // and hence next call would return correct lag result -client.respond(listOffsetsResponse(singletonMap(tp0, 90L))); +ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS); +client.respondToRequest(listOffsetRequest, listOffsetsResponse(singletonMap(tp0, 90L))); consumer.poll(Duration.ofMillis(0)); -assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0)); +// For AsyncKafkaConsumer, subscription sate is updated in background, so the result will eventually be updated. +TestUtils.waitForCondition(() -> { +OptionalLong result = consumer.currentLag(tp0); +return result.isPresent() && result.getAsLong() == 40L; +}, "Subscription state is not updated"); // requests: fetch -assertEquals(1, client.inFlightRequestCount()); +TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FETCH), "No fetch request sent"); // one successful fetch should update the log end offset and the position +ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH); final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); -client.respond(fetchResponse(singletonMap(tp0, fetchInfo))); +client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, fetchInfo))); final ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); assertEquals(5, records.count()); assertEquals(55L, consumer.position(tp0)); // correct lag result -assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0)); +// For AsyncKafkaConsumer, subscription sate is updated in background, so the result will eventually be updated. +TestUtils.waitForCondition(() -> { Review Comment: Yes, you're right, if `consumer.position` can get `45, then the subscription state has already been updated. Remove `TestUtils.waitForCondition` here. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]
FrankYang0529 commented on code in PR #16982: URL: https://github.com/apache/kafka/pull/16982#discussion_r1776899718 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) { // poll once to update with the current metadata consumer.poll(Duration.ofMillis(0)); +TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), +"No metadata requests sent"); client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); // no error for no current position assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); -assertEquals(0, client.inFlightRequestCount()); - +if (groupProtocol == GroupProtocol.CLASSIC) { +// Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), +// different from the new async consumer, that will send the LIST_OFFSETS request in the background thread +// on the next background thread poll. +assertEquals(0, client.inFlightRequestCount()); +} // poll once again, which should send the list-offset request consumer.seek(tp0, 50L); consumer.poll(Duration.ofMillis(0)); // requests: list-offset, fetch -assertEquals(2, client.inFlightRequestCount()); +TestUtils.waitForCondition(() -> { +boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS); +boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH); +return hasListOffsetRequest && hasFetchRequest; +}, "No list-offset & fetch request sent"); // no error for no end offset (so unknown lag) assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); // poll once again, which should return the list-offset response // and hence next call would return correct lag result -client.respond(listOffsetsResponse(singletonMap(tp0, 90L))); +ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS); +client.respondToRequest(listOffsetRequest, listOffsetsResponse(singletonMap(tp0, 90L))); consumer.poll(Duration.ofMillis(0)); -assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0)); +// For AsyncKafkaConsumer, subscription sate is updated in background, so the result will eventually be updated. +TestUtils.waitForCondition(() -> { +OptionalLong result = consumer.currentLag(tp0); +return result.isPresent() && result.getAsLong() == 40L; +}, "Subscription state is not updated"); // requests: fetch -assertEquals(1, client.inFlightRequestCount()); +TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FETCH), "No fetch request sent"); // one successful fetch should update the log end offset and the position +ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH); final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); -client.respond(fetchResponse(singletonMap(tp0, fetchInfo))); +client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, fetchInfo))); final ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); assertEquals(5, records.count()); assertEquals(55L, consumer.position(tp0)); // correct lag result -assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0)); +// For AsyncKafkaConsumer, subscription sate is updated in background, so the result will eventually be updated. +TestUtils.waitForCondition(() -> { Review Comment: Yes, you're right, if `consumer.position` can get `45`, then the subscription state has already been updated. Remove `TestUtils.waitForCondition` here. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17505: New consumer seekToBeginning/End should run in background thread [kafka]
lianetm commented on code in PR #17230: URL: https://github.com/apache/kafka/pull/17230#discussion_r1776850858 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -842,8 +843,8 @@ public void seekToEnd(Collection partitions) { acquireAndEnsureOpen(); try { -Collection parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions; -subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST); +cachedSubscriptionHasAllFetchPositions = false; +applicationEventHandler.add(new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST)); Review Comment: Hey here, sorry for the late reply. I agree that these 3 methods should have similar behaviour, but I wonder if the case is that they should all use `addAndGet`? My concern is, when seeking (either of the 3 methods), we do expect that as soon as they complete, the position is updated so that other funcs will see it (ex. currentLag, poll). We would be not respecting that anymore if we use add. seek could complete without having updated the offsets right? From the seek contract, it states that "Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}", so we need to: 1. override fetch offsets (this requires `addAndGet` I expect) 2. make sure they are available on the next poll (again addAndGet on seek, otherwise consumer.seek + consumer.poll may not find the offsets if the poll happens when the `seek` with `add` is still being processed in the background right?). Also calls like currentLag after that seek + add could not have the info they need I guess. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15266) Static configs set for non primary synonyms are ignored for Log configs
[ https://issues.apache.org/jira/browse/KAFKA-15266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash resolved KAFKA-15266. -- Resolution: Duplicate > Static configs set for non primary synonyms are ignored for Log configs > --- > > Key: KAFKA-15266 > URL: https://issues.apache.org/jira/browse/KAFKA-15266 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Aman Harish Gandhi >Assignee: Aman Harish Gandhi >Priority: Major > > In our server.properties we had the following config > {code:java} > log.retention.hours=48 > {code} > We noticed that after running alter configs to update broker level config(for > a config unrelated to retention) we were only deleting data after 7 days > instead of the configured 2. > The alterconfig we had ran was similar to this > {code:java} > sh kafka-config.sh --bootstrap-server localhost:9092 --alter --add-config > "log.segment.bytes=50" > {code} > Digging deeper the issue could be pin pointed to the reconfigure block of > DynamicLogConfig inside DynamicBrokerConfig. Here we only look at the > "primary" KafkaConfig synonym of the LogConfig and if it is not set then we > remove the value set in default log config as well. This eventually leads to > the retention.ms not being set in the default log config and that leads to > the default value of 7 days being used. The value set in > "log.retention.hours" is completely ignored in this case. > Pasting the relevant code block here > {code:java} > newConfig.valuesFromThisConfig.forEach { (k, v) => > if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) { > DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName => > if (v == null) > newBrokerDefaults.remove(configName) > else > newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef]) > } > } > } {code} > In the above block `DynamicLogConfig.ReconfigurableConfigs` contains only > log.retention.ms. It does not contain the other synonyms like > `log.retention.minutes` or `log.retention.hours`. > This issue seems prevalent in all cases where there are more than 1 > KafkaConfig synonyms for the LogConfig. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Refactored and fixed minor share fetch code (KIP-932) [kafka]
mumrah merged PR #17269: URL: https://github.com/apache/kafka/pull/17269 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17618: group consumer heartbeat interval should be less than session timeout [kafka]
FrankYang0529 opened a new pull request, #17281: URL: https://github.com/apache/kafka/pull/17281 [KIP-848](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session) mentions: > The member is expected to heartbeat every group.consumer.heartbeat.interval.ms in order to keep its session opened. If it does not heartbeat at least once within the group.consumer.session.timeout.ms, the group coordinator will kick the member out from the group. To avoid misconfiguration, we can add validation for it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
mumrah commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776991897 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java: ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.share.PersisterStateBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; + +public class StateBatchUtil { +/** + * Util method which takes in 2 lists containing {@link PersisterStateBatch} + * and the startOffset. + * This method removes any batches where the lastOffset < startOffset, if the startOffset > -1. + * It then merges any contiguous intervals with same state. If states differ, + * based on various conditions it creates new non-overlapping batches preferring new ones. + * @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch} + * @param newBatches - List containing {@link PersisterStateBatch} in incoming request + * @param startOffset - startOffset to consider when removing old batches. + * @return List containing combined batches + */ +public static List combineStateBatches( +List batchesSoFar, +List newBatches, +long startOffset +) { +List combinedList = new ArrayList<>(batchesSoFar.size() + newBatches.size()); +combinedList.addAll(batchesSoFar); +combinedList.addAll(newBatches); + +return mergeBatches( +pruneBatches( +combinedList, +startOffset +) +); +} + +/** + * Encapsulates the main merge algorithm. Consider 2 batches (A, B): + * - Same state (delivery count and state) + * - If overlapping - merge into single batch + * - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches into a single 1 + * - Different state (delivery count or state differ) + * - Based on various cases: + * - swallow lower priority batch within bounds of offsets + * - break batch into other non-overlapping batches + * @param batches - List of {@link PersisterStateBatch} + * @return List of non-overlapping {@link PersisterStateBatch} + */ +private static List mergeBatches(List batches) { +if (batches.size() < 2) { +return batches; +} +TreeSet sortedBatches = new TreeSet<>(batches); +List finalBatches = new ArrayList<>(batches.size() * 2); // heuristic size + +BatchOverlapState overlapState = getOverlappingState(sortedBatches); + +while (overlapState != BatchOverlapState.SENTINEL) { +PersisterStateBatch last = overlapState.last(); +PersisterStateBatch candidate = overlapState.candidate(); + +// remove non overlapping prefix from sortedBatches, +// will make getting next overlapping pair efficient +// as a prefix batch which is non overlapping will only +// be checked once. +if (overlapState.nonOverlapping() != null) { +overlapState.nonOverlapping().forEach(sortedBatches::remove); +finalBatches.addAll(overlapState.nonOverlapping()); +} + +if (candidate == null) { +overlapState = BatchOverlapState.SENTINEL; +continue; +} + +// remove both last and candidate for easier +// assessment about adding batches to sortedBatches +sortedBatches.remove(last); +sortedBatches.remove(candidate); + +// overlap and same state (last.firstOffset <= candidate.firstOffset) due to sort +// covers: +// case:12 34 5 6 7 (contiguous) +// last:__ ______ ___ ___ ___ +// candidate: __
Re: [PR] KAFKA-17612: Remove some tests that only apply to ZK mode or migration [kafka]
chia7712 merged PR #17276: URL: https://github.com/apache/kafka/pull/17276 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17621) Reduce logging verbosity on ConsumerGroupHeartbeat path
David Jacot created KAFKA-17621: --- Summary: Reduce logging verbosity on ConsumerGroupHeartbeat path Key: KAFKA-17621 URL: https://issues.apache.org/jira/browse/KAFKA-17621 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout
[ https://issues.apache.org/jira/browse/KAFKA-17618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17618: -- Component/s: clients consumer > group consumer heartbeat interval should be less than session timeout > - > > Key: KAFKA-17618 > URL: https://issues.apache.org/jira/browse/KAFKA-17618 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > Labels: kip-848 > > [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session] > mentions: > bq. The member is expected to heartbeat every > group.consumer.heartbeat.interval.ms in order to keep its session opened. If > it does not heartbeat at least once within the > group.consumer.session.timeout.ms, the group coordinator will kick the member > out from the group. > To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than > _group.consumer.session.timeout.ms_, we can add validation for it. > We can do similar validation for _group.share.heartbeat.interval.ms_ and > _group.share.session.timeout.ms_ as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16343: Add unit tests of foreignKeyJoin classes [kafka]
wrwksexahatvani commented on PR #15564: URL: https://github.com/apache/kafka/pull/15564#issuecomment-2376930948 @wcarlson5 @mjsax In which release will these tests and of course the fix in KAFKA-16394 be integrated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout
[ https://issues.apache.org/jira/browse/KAFKA-17618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17618: -- Component/s: (was: clients) (was: consumer) > group consumer heartbeat interval should be less than session timeout > - > > Key: KAFKA-17618 > URL: https://issues.apache.org/jira/browse/KAFKA-17618 > Project: Kafka > Issue Type: Task >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > Labels: kip-848 > > [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session] > mentions: > bq. The member is expected to heartbeat every > group.consumer.heartbeat.interval.ms in order to keep its session opened. If > it does not heartbeat at least once within the > group.consumer.session.timeout.ms, the group coordinator will kick the member > out from the group. > To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than > _group.consumer.session.timeout.ms_, we can add validation for it. > We can do similar validation for _group.share.heartbeat.interval.ms_ and > _group.share.session.timeout.ms_ as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll [kafka]
lianetm commented on PR #17165: URL: https://github.com/apache/kafka/pull/17165#issuecomment-2377143597 FYI, I filed https://issues.apache.org/jira/browse/KAFKA-17623 for a flaky consumer integration test that I noticed here, has been flaky for a while. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
Lianet Magrans created KAFKA-17623: -- Summary: Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback Key: KAFKA-17623 URL: https://issues.apache.org/jira/browse/KAFKA-17623 Project: Kafka Issue Type: Bug Components: consumer Reporter: Lianet Magrans Flaky for the new consumer, failing with : org.apache.kafka.common.KafkaException: User rebalance callback throws an error at app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259) at app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867) at app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195) at app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181) at app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758) at app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618) ... Caused by: java.lang.IllegalStateException: No current assignment for partition topic-0 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395) at org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425) at org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171) Flaky behaviour: https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll [kafka]
FrankYang0529 commented on PR #17165: URL: https://github.com/apache/kafka/pull/17165#issuecomment-2377164009 > FYI, I filed https://issues.apache.org/jira/browse/KAFKA-17623 for a flaky consumer integration test that I noticed here, has been flaky for a while. Okay. I will take a look. Thanks for filing the issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
[ https://issues.apache.org/jira/browse/KAFKA-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885047#comment-17885047 ] PoAn Yang commented on KAFKA-17623: --- Hi [~lianetm], may I take this? Thank you. > Flaky > testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback > > > Key: KAFKA-17623 > URL: https://issues.apache.org/jira/browse/KAFKA-17623 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > > Flaky for the new consumer, failing with : > org.apache.kafka.common.KafkaException: User rebalance callback throws an > error at > app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618) > ... > Caused by: java.lang.IllegalStateException: No current assignment for > partition topic-0 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171) > > Flaky behaviour: > > https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] (WIP) MINOR: Cache topic resolution in TopicIds set [kafka]
dajac closed pull request #16527: (WIP) MINOR: Cache topic resolution in TopicIds set URL: https://github.com/apache/kafka/pull/16527 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17317: Validate and maybe trigger downgrade after static member replacement [kafka]
dongnuo123 closed pull request #17008: KAFKA-17317: Validate and maybe trigger downgrade after static member replacement URL: https://github.com/apache/kafka/pull/17008 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
[ https://issues.apache.org/jira/browse/KAFKA-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885049#comment-17885049 ] Lianet Magrans commented on KAFKA-17623: Sure, thanks! > Flaky > testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback > > > Key: KAFKA-17623 > URL: https://issues.apache.org/jira/browse/KAFKA-17623 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor > > Flaky for the new consumer, failing with : > org.apache.kafka.common.KafkaException: User rebalance callback throws an > error at > app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618) > ... > Caused by: java.lang.IllegalStateException: No current assignment for > partition topic-0 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171) > > Flaky behaviour: > > https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-17317: Validate and maybe trigger downgrade after static member replacement [kafka]
dongnuo123 opened a new pull request, #17286: URL: https://github.com/apache/kafka/pull/17286 https://issues.apache.org/jira/browse/KAFKA-17317 This patch makes the online downgrade trigger asynchronous by scheduling a timer to downgrade the group in the appendFuture of the coordinator result. To avoid duplication, the rebalance is only triggered after the downgrade conversion but not after the last consumer protocol member leaves the group. If the downgrade is triggered by static member replacement, no rebalance will be triggered. This also fixes the bug discovered in the system test, where the group can't be downgraded when the last static member using the consumer protocol is replaced by a classic member with the same instance id. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] (WIP) MINOR: Cache topic resolution in TopicIds set [kafka]
dajac commented on PR #16527: URL: https://github.com/apache/kafka/pull/16527#issuecomment-2377167228 Replaced by https://github.com/apache/kafka/pull/17285. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
[ https://issues.apache.org/jira/browse/KAFKA-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-17623: -- Assignee: PoAn Yang > Flaky > testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback > > > Key: KAFKA-17623 > URL: https://issues.apache.org/jira/browse/KAFKA-17623 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor > > Flaky for the new consumer, failing with : > org.apache.kafka.common.KafkaException: User rebalance callback throws an > error at > app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758) > at > app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618) > ... > Caused by: java.lang.IllegalStateException: No current assignment for > partition topic-0 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171) > > Flaky behaviour: > > https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]
mumrah commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776956765 ## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/StateBatchUtilTest.java: ## @@ -60,36 +60,60 @@ static class BatchTestHolder { this.shouldRun = shouldRun; } +static List singleBatch( +long firstOffset, +long lastOffset, +byte deliveryState, +short deliveryCount Review Comment: I would let these be `int` so we can avoid casting in the test code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix a race and add JMH bench for HdrHistogram [kafka]
jeffkbkim commented on code in PR #17221: URL: https://github.com/apache/kafka/pull/17221#discussion_r1776938178 ## coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java: ## @@ -172,4 +178,39 @@ public void testHistogramDataReset() { assertEquals(numEventsInFirstCycle, hdrHistogram.count(now + maxSnapshotAgeMs)); assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 + maxSnapshotAgeMs)); } + +@Test +public void testLatestHistogramRace() throws InterruptedException, ExecutionException { Review Comment: nit: how's testLatestHistogramShouldNotReturnUnfilledHistogram? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org