Re: [PR] KAFKA-19264: Remove fallback for thread pool sizes in RemoteLogManagerConfig [kafka]
kamalcph commented on PR #19673: URL: https://github.com/apache/kafka/pull/19673#issuecomment-2869962173 LGTM, thanks for fixing this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Move TxnTransitMetadata to transaction-coordinator [kafka]
chia7712 merged PR #19662: URL: https://github.com/apache/kafka/pull/19662 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-19267) the min version used by ListOffsetsRequest should be 1 rather than 0
[ https://issues.apache.org/jira/browse/KAFKA-19267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17950784#comment-17950784 ] Chia-Ping Tsai commented on KAFKA-19267: related code: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java#L69 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java#L84 > the min version used by ListOffsetsRequest should be 1 rather than 0 > > > Key: KAFKA-19267 > URL: https://issues.apache.org/jira/browse/KAFKA-19267 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > in the protocol, `ListOffsetsRequest` v0 is unsupported - but in the code > base we still set the oldestAllowedVersion to 0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-19267) the min version used by ListOffsetsRequest should be 1 rather than 0
Chia-Ping Tsai created KAFKA-19267: -- Summary: the min version used by ListOffsetsRequest should be 1 rather than 0 Key: KAFKA-19267 URL: https://issues.apache.org/jira/browse/KAFKA-19267 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai in the protocol, `ListOffsetsRequest` v0 is unsupported - but in the code base we still set the oldestAllowedVersion to 0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-19267) the min version used by ListOffsetsRequest should be 1 rather than 0
[ https://issues.apache.org/jira/browse/KAFKA-19267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunchi Pang reassigned KAFKA-19267: --- Assignee: Yunchi Pang (was: Chia-Ping Tsai) > the min version used by ListOffsetsRequest should be 1 rather than 0 > > > Key: KAFKA-19267 > URL: https://issues.apache.org/jira/browse/KAFKA-19267 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Yunchi Pang >Priority: Minor > > in the protocol, `ListOffsetsRequest` v0 is unsupported - but in the code > base we still set the oldestAllowedVersion to 0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix unchecked type warnings in several test classes [kafka]
frankvicky commented on code in PR #19679: URL: https://github.com/apache/kafka/pull/19679#discussion_r2083898755 ## metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java: ## @@ -446,7 +446,7 @@ public void testClearElrRecordForNonExistTopic() { @Test public void testTopicDeltaElectionStatsWithEmptyImage() { -TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.EMPTY_MAP); +TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.emptyMap()); Review Comment: > Just wondering, what's the purpose of the screenshot? Prove the patch works and make the reviewer's life easier. ## metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java: ## @@ -446,7 +446,7 @@ public void testClearElrRecordForNonExistTopic() { @Test public void testTopicDeltaElectionStatsWithEmptyImage() { -TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.EMPTY_MAP); +TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.emptyMap()); Review Comment: > Just wondering, what's the purpose of the screenshot? Prove the patch works and make the reviewer's life easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19253: Improve metadata handling for share version using feature listeners (1/N) [kafka]
AndrewJSchofield commented on code in PR #19659: URL: https://github.com/apache/kafka/pull/19659#discussion_r2083611522 ## core/src/main/java/kafka/server/share/SharePartitionManager.java: ## @@ -746,6 +747,29 @@ private Consumer> failedShareAcknowledgeMetricsHandler() { }; } +/** + * The handler for share version feature metadata changes. + * @param shareVersion the new share version feature + */ +public void onShareVersionToggle(ShareVersion shareVersion) { +if (!shareVersion.supportsShareGroups()) { +// Remove all share sessions from share session cache. +synchronized (cache) { Review Comment: There's a race condition here. The user turns off share groups, which blocks new requests in `KafkaApis`. However, a request had just got past that check before the feature was downgraded. Now, this code runs which clears the share session cache, but the in-flight request calls `cache.maybeCreateSession`. We want cache additions when the feature is not enabled to be prevented I think. ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -232,6 +240,23 @@ class BrokerMetadataPublisher( if (_firstPublish) { finishInitializingReplicaManager() } + + if (delta.featuresDelta != null) { +try { + val newFinalizedFeatures = new FinalizedFeatures(newImage.features.metadataVersionOrThrow, newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset) + // Share version feature has been toggled. + if (!(newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort) == finalizedShareVersion)) { +finalizedShareVersion = newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort) +val shareVersion: ShareVersion = ShareVersion.fromFeatureLevel(finalizedShareVersion) +info(s"Share version has been toggled to $shareVersion") Review Comment: The message when the feature has been enabled is `Share version has been toggled to SV_1`. The string `SV_1` is only meaningful in the code. It would be better to say something like `Feature share.version has been updated to version 1.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Minor updates to RangeSet [kafka]
squah-confluent opened a new pull request, #19678: URL: https://github.com/apache/kafka/pull/19678 Minor updates to RangeSet: * Fix size() to return 0 when to < from * Fix equals() so that all empty RangeSets are equal, to follow the Set interface definition better. * Reimplement hashCode() to follow the Set interface definition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fit HTML markup [kafka]
mjsax merged PR #19676: URL: https://github.com/apache/kafka/pull/19676 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix unchecked type warnings in several test classes [kafka]
m1a2st commented on code in PR #19679: URL: https://github.com/apache/kafka/pull/19679#discussion_r2083766600 ## metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java: ## @@ -446,7 +446,7 @@ public void testClearElrRecordForNonExistTopic() { @Test public void testTopicDeltaElectionStatsWithEmptyImage() { -TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.EMPTY_MAP); +TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.emptyMap()); Review Comment: We could use `Map.of()` instead of `Collections.emptyMap()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17828: Reverse Checkpointing in MM2 [kafka]
github-actions[bot] commented on PR #17593: URL: https://github.com/apache/kafka/pull/17593#issuecomment-2870720484 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix unchecked type warnings in several test classes [kafka]
YutaLin commented on code in PR #19679: URL: https://github.com/apache/kafka/pull/19679#discussion_r2083773873 ## metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java: ## @@ -446,7 +446,7 @@ public void testClearElrRecordForNonExistTopic() { @Test public void testTopicDeltaElectionStatsWithEmptyImage() { -TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.EMPTY_MAP); +TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.emptyMap()); Review Comment: Hi @frankvicky, here it is. Just wondering, what's the purpose of the screenshot? https://github.com/user-attachments/assets/1e208779-94d2-4917-b971-54d97902ffda"; /> Hi @m1a2st What's the benefit to use `Map.of()` instead of `Collections.emptyMap()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix unchecked type warnings in several test classes [kafka]
m1a2st commented on code in PR #19679: URL: https://github.com/apache/kafka/pull/19679#discussion_r2083777217 ## metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java: ## @@ -446,7 +446,7 @@ public void testClearElrRecordForNonExistTopic() { @Test public void testTopicDeltaElectionStatsWithEmptyImage() { -TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.EMPTY_MAP); +TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.emptyMap()); Review Comment: `Collections.emptyMap()` is part of the older Java API, while `Map.of()` is the modern alternative. Since other PRs are also cleaning up the legacy API usage, we should avoid using the old API in new code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-19266) Eliminate flakiness in test SharePartitionTest.testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement
Abhinav Dixit created KAFKA-19266: - Summary: Eliminate flakiness in test SharePartitionTest.testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement Key: KAFKA-19266 URL: https://issues.apache.org/jira/browse/KAFKA-19266 Project: Kafka Issue Type: Sub-task Reporter: Abhinav Dixit There seems to be flakiness in test SharePartitionTest.testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement [https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=Asia%2FCalcutta&tests.container=kafka.server.share.SharePartitionTest&tests.test=testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement()] which is unrelated to acquisition lock timeout, but more related to initialization of share partition -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16717 [2/N]: Add AdminClient.alterShareGroupOffsets [kafka]
AndrewJSchofield commented on code in PR #18929: URL: https://github.com/apache/kafka/pull/18929#discussion_r2083622116 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3620,7 +3620,50 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { val alterShareGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest] -requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) +val groupId = alterShareGroupOffsetsRequest.data.groupId + +if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) +} else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) +} else { + val responseBuilder = new AlterShareGroupOffsetsResponse.Builder() + val authorizedTopicPartitions = new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection() + + alterShareGroupOffsetsRequest.data.topics.forEach(topic => { +val invalidTopicError = checkValidTopic(topic.topicName()) +val topicError = invalidTopicError.orElse { + if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName())) { +Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED)) + } else if (!metadataCache.contains(topic.topicName())) { +Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + } else { +None + } +} +topicError match { + case Some(error) => +topic.partitions().forEach(partition => responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), error.error)) + case None => +authorizedTopicPartitions.add(topic) +} + }) + + val data = new AlterShareGroupOffsetsRequestData() +.setGroupId(groupId) +.setTopics(authorizedTopicPartitions) + groupCoordinator.alterShareGroupOffsets( +request.context, +groupId, +data + ).handle[Unit] { (response, exception) => +if (exception != null) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(exception)) +} else { + requestHelper.sendMaybeThrottle(request, responseBuilder.merge(response).build()) +} + } +} CompletableFuture.completedFuture[Unit](()) Review Comment: There is no good reason. In KafkaApis.scale, some handle methods return `CompletableFuture` while others are just `Unit`. It would be nice if they were all the same (and even better if it was Java, but that's for another day). ## clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java: ## @@ -65,7 +65,8 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setPartitions(topicResult.partitions().stream() .map(partitionData -> new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition() .setPartitionIndex(partitionData.partitionIndex()) -.setErrorCode(Errors.forException(e).code())) +.setErrorCode(Errors.forException(e).code()) Review Comment: `Errors.forException()` scans the list of Errors matching on exception class. I would prefer this to be done just once, such as assigning a local Errors variable and then using it to set the error code and message on each look iteration. ## clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java: ## @@ -78,6 +79,25 @@ public static AlterShareGroupOffsetsRequest parse(Readable readable, short versi ); } +public static AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic getErrorAlterShareGroup( +Errors error +) { +return new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic() Review Comment: In `DeleteShareGroupOffsetsResponse`, there are a top-level error code and message. These are missing in `AlterShareGroupOffsetResponse`. It would probably be best to add them, rather than setting an empty topic name and -1 partition index like this. wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Upgrade jetty version [kafka]
frankvicky merged PR #19680: URL: https://github.com/apache/kafka/pull/19680 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix unchecked type warnings in several test classes [kafka]
frankvicky commented on code in PR #19679: URL: https://github.com/apache/kafka/pull/19679#discussion_r2083748605 ## metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java: ## @@ -446,7 +446,7 @@ public void testClearElrRecordForNonExistTopic() { @Test public void testTopicDeltaElectionStatsWithEmptyImage() { -TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.EMPTY_MAP); +TopicImage image = new TopicImage("topic", Uuid.randomUuid(), Collections.emptyMap()); Review Comment: `Collections.emptyMap()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactored DescribeTopicPartitionsRequestHandler: Improve readability and add code documentation [kafka]
github-actions[bot] commented on PR #19636: URL: https://github.com/apache/kafka/pull/19636#issuecomment-2870683846 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19091: Fix race condition in DelayedFutureTest [kafka]
github-actions[bot] commented on PR #19553: URL: https://github.com/apache/kafka/pull/19553#issuecomment-2870683892 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18066: Fix mismatched StreamThread ID in log messages [kafka]
github-actions[bot] commented on PR #19517: URL: https://github.com/apache/kafka/pull/19517#issuecomment-2870683922 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
ijuma commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2083606229 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: It looks like a high quality library from their description of why they didn't use existing ones: https://www.dynatrace.com/news/blog/hash4j-new-library-java/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
ijuma commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2083606336 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: If this is a server-side dependency, it seems fine. If it's also client-side, we probably need to think more about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-18695) Remove quorum=kraft and kip932 from all integration tests
[ https://issues.apache.org/jira/browse/KAFKA-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-18695. Fix Version/s: 4.1.0 Resolution: Fixed > Remove quorum=kraft and kip932 from all integration tests > - > > Key: KAFKA-18695 > URL: https://issues.apache.org/jira/browse/KAFKA-18695 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Ming-Yen Chung >Priority: Major > Fix For: 4.1.0 > > > all integration tests are using kraft mode so we don't need to use > `quorum=kraft` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18695) Remove quorum=kraft and kip932 from all integration tests
[ https://issues.apache.org/jira/browse/KAFKA-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-18695: --- Summary: Remove quorum=kraft and kip932 from all integration tests (was: Remove quorum=kraft from all integration tests) > Remove quorum=kraft and kip932 from all integration tests > - > > Key: KAFKA-18695 > URL: https://issues.apache.org/jira/browse/KAFKA-18695 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Ming-Yen Chung >Priority: Major > > all integration tests are using kraft mode so we don't need to use > `quorum=kraft` -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18695: Remove quorum=kraft and kip932 from all integration tests [kafka]
chia7712 merged PR #19633: URL: https://github.com/apache/kafka/pull/19633 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: fit HTML markup [kafka]
mjsax opened a new pull request, #19676: URL: https://github.com/apache/kafka/pull/19676 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18904: [1/N] Change ListClientMetricsResources API to ListConfigResources [kafka]
AndrewJSchofield commented on code in PR #19493: URL: https://github.com/apache/kafka/pull/19493#discussion_r2083608078 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -10929,47 +10928,231 @@ class KafkaApisTest extends Logging { } @Test - def testListClientMetricsResources(): Unit = { -val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) -metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + def testListConfigResourcesV0(): Unit = { +val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(0)) +metadataCache = mock(classOf[KRaftMetadataCache]) + +val resources = util.Set.of("client-metric1", "client-metric2") +when(clientMetricsManager.listClientMetricsResources).thenReturn(resources) -val resources = new mutable.HashSet[String] -resources.add("test1") -resources.add("test2") - when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava) kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) -val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) -val expectedResponse = new ListClientMetricsResourcesResponseData().setClientMetricsResources( - resources.map(resource => new ClientMetricsResource().setName(resource)).toBuffer.asJava) -assertEquals(expectedResponse, response.data) +val response = verifyNoThrottling[ListConfigResourcesResponse](request) +val expectedResponse = new ListConfigResourcesResponse( + new ListConfigResourcesResponseData() +.setConfigResources( + resources.stream.map(resource => +new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource) + ).toList) +) +assertEquals(expectedResponse.data, response.data) + +verify(metadataCache, never).getAllTopics +verify(groupConfigManager, never).groupIds +verify(metadataCache, never).getBrokerNodes(any) } @Test - def testListClientMetricsResourcesEmptyResponse(): Unit = { -val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) -metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + def testListConfigResourcesV1WithEmptyResourceTypes(): Unit = { +val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(1)) +metadataCache = mock(classOf[KRaftMetadataCache]) + +val clientMetrics = util.Set.of("client-metric1", "client-metric2") +val topics = util.Set.of("topic1", "topic2") +val groupIds = util.List.of("group1", "group2") +val nodeIds = util.List.of(1, 2) + when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics) +when(metadataCache.getAllTopics).thenReturn(topics) +when(groupConfigManager.groupIds).thenReturn(groupIds) + when(metadataCache.getBrokerNodes(any())).thenReturn(nodeIds.stream().map(id => new Node(id, "localhost", 1234)).toList) + +kafkaApis = createKafkaApis() +kafkaApis.handle(request, RequestLocal.noCaching) +val response = verifyNoThrottling[ListConfigResourcesResponse](request) +val expectedResponse = new ListConfigResourcesResponse( + new ListConfigResourcesResponseData() +.setConfigResources( + util.stream.Stream.of( +groupIds.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.GROUP.id) +).toList, +clientMetrics.stream.map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.CLIENT_METRICS.id) +).toList, +nodeIds.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id) +).toList, +nodeIds.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER.id) +).toList, +topics.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id) +).toList + ).flatMap(s => s.stream).toList) +) +assertEquals(expectedResponse.data, response.data) + } + + @Test + def testListConfigResourcesV1WithGroup(): Unit = { +val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData() + .setResourceTypes(util.List.of(ConfigResource.
Re: [PR] KAFKA-19264: Remove fallback for thread pool sizes in RemoteLogManagerConfig [kafka]
chia7712 merged PR #19673: URL: https://github.com/apache/kafka/pull/19673 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-19264) Remove fallback for thread pool sizes in RemoteLogManagerConfig
[ https://issues.apache.org/jira/browse/KAFKA-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-19264. Fix Version/s: 4.1.0 Resolution: Fixed > Remove fallback for thread pool sizes in RemoteLogManagerConfig > --- > > Key: KAFKA-19264 > URL: https://issues.apache.org/jira/browse/KAFKA-19264 > Project: Kafka > Issue Type: Task > Components: Tiered-Storage >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Minor > Fix For: 4.1.0 > > > The fallback mechanism was first introduced in > [KIP-950|https://cwiki.apache.org/confluence/x/joqzDw]. According to the > proposal, if no thread values are set for > {{remote.log.manager.copier.thread.pool.size}} and > {{{}remote.log.manager.expiration.thread.pool.size{}}}, these two configs > would default to using the value of > {{{}remote.log.manager.thread.pool.size{}}}. > As quoted from the KIP: > {quote}If no thread values are set for the two new configurations presented > later on in the document we will default to using the same number of threads > in each pool as detailed by remote.log.manager.thread.pool.size. > {quote} > This fallback behavior was implemented in > [https://github.com/apache/kafka/commit/84ab3b9a5c4930f5ae047df088e38c456c7cde54]. > However, this approach was abandoned in > [KIP-1030|https://cwiki.apache.org/confluence/x/FAqpEQ], where the default > values for {{copier}} and {{expiration}} thread pool sizes were changed from > {{-1}} to {{{}10{}}}. The related commit can be found in > [https://github.com/apache/kafka/commit/3b1bd3812e48d488e4b6b53a9085d6552e8adf02]. > Additionally, both {{remote.log.manager.copier.thread.pool.size}} and > {{remote.log.manager.expiration.thread.pool.size}} now include a > configuration validator that enforces a minimum value of {{{}1{}}}. This > means the fallback mechanism should be removed from > [RemoteLogManagerConfig.java|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java] > to align with the new defaults and validation. > In short, RemoteLogManagerConfig should apply the following changes: > {code:java} > public int remoteLogManagerCopierThreadPoolSize() { > -int size = > config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); > -return size == -1 ? remoteLogManagerThreadPoolSize() : size; > +return > config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); > } > public int remoteLogManagerExpirationThreadPoolSize() { > -int size = > config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); > -return size == -1 ? remoteLogManagerThreadPoolSize() : size; > +return > config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Add deprecation warning for `log.cleaner.enable` when disabled [kafka]
Mirai1129 opened a new pull request, #19674: URL: https://github.com/apache/kafka/pull/19674 This parameter `log.cleaner.enable` has been deprecated, in Kafka 5.0 later cannot disable cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add deprecation warning for `log.cleaner.enable` when disabled [kafka]
chia7712 commented on code in PR #19674: URL: https://github.com/apache/kafka/pull/19674#discussion_r2083581944 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -629,6 +629,8 @@ class LogManager(logDirs: Seq[File], if (cleanerConfig.enableCleaner) { _cleaner = new LogCleaner(cleanerConfig, liveLogDirs.asJava, currentLogs, logDirFailureChannel, time) _cleaner.startup() +} else { + warn("The parameter `log.cleaner.enable` is deprecated and will be removed in Kafka 5.0.") Review Comment: `parameter` -> `config`. Also, could you please highlight that in 5.0 the cleaner is always enabled? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add deprecation warning for `log.cleaner.enable` when disabled [kafka]
Mirai1129 commented on code in PR #19674: URL: https://github.com/apache/kafka/pull/19674#discussion_r2083583209 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -629,6 +629,8 @@ class LogManager(logDirs: Seq[File], if (cleanerConfig.enableCleaner) { _cleaner = new LogCleaner(cleanerConfig, liveLogDirs.asJava, currentLogs, logDirFailureChannel, time) _cleaner.startup() +} else { + warn("The parameter `log.cleaner.enable` is deprecated and will be removed in Kafka 5.0.") Review Comment: Thank you so much! I'll update them directly! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18688: Fix uniform homogeneous assignor stability [kafka]
squah-confluent commented on PR #19677: URL: https://github.com/apache/kafka/pull/19677#issuecomment-2870221833 Benchmarks before and after Scroll right for after numbers. ``` Benchmark: ServerSideAssignorBenchmark.doAssignment assignorType: UNIFORM isRackAware: false subscriptionType: HOMOGENEOUS Before After (assignmentType) (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt ScoreError Units ScoreError Units FULL100 510 avgt5 0.020 ± 0.001 ms/op 0.020 ± 0.001 ms/op FULL100 5 100 avgt5 0.027 ± 0.001 ms/op 0.026 ± 0.002 ms/op FULL100 5 1000 avgt5 0.066 ± 0.001 ms/op 0.063 ± 0.002 ms/op FULL100 1010 avgt5 0.030 ± 0.001 ms/op 0.033 ± 0.001 ms/op FULL100 10 100 avgt5 0.035 ± 0.001 ms/op 0.037 ± 0.003 ms/op FULL100 10 1000 avgt5 0.131 ± 0.005 ms/op 0.129 ± 0.003 ms/op FULL100 5010 avgt5 0.134 ± 0.004 ms/op 0.121 ± 0.002 ms/op FULL100 50 100 avgt5 0.125 ± 0.002 ms/op 0.144 ± 0.003 ms/op FULL100 50 1000 avgt5 0.205 ± 0.005 ms/op 0.193 ± 0.008 ms/op FULL500 510 avgt5 0.089 ± 0.002 ms/op 0.090 ± 0.002 ms/op FULL500 5 100 avgt5 0.108 ± 0.006 ms/op 0.101 ± 0.001 ms/op FULL500 5 1000 avgt5 0.176 ± 0.008 ms/op 0.174 ± 0.005 ms/op FULL500 1010 avgt5 0.136 ± 0.003 ms/op 0.131 ± 0.002 ms/op FULL500 10 100 avgt5 0.146 ± 0.004 ms/op 0.147 ± 0.002 ms/op FULL500 10 1000 avgt5 0.224 ± 0.003 ms/op 0.241 ± 0.004 ms/op FULL500 5010 avgt5 0.668 ± 0.012 ms/op 0.699 ± 0.013 ms/op FULL500 50 100 avgt5 0.685 ± 0.018 ms/op 0.696 ± 0.024 ms/op FULL500 50 1000 avgt5 0.773 ± 0.021 ms/op 0.958 ± 0.028 ms/op FULL 1000 510 avgt5 0.179 ± 0.004 ms/op 0.168 ± 0.004 ms/op FULL 1000 5 100 avgt5 0.196 ± 0.002 ms/op 0.191 ± 0.004 ms/op FULL 1000 5 1000 avgt5 0.261 ± 0.006 ms/op 0.312 ± 0.004 ms/op FULL 1000 1010 avgt5 0.288 ± 0.007 ms/op 0.264 ± 0.005 ms/op FULL 1000 10 100 avgt5 0.290 ± 0.012 ms/op 0.310 ± 0.055 ms/op FULL 1000 10 1000 avgt5 0.412 ± 0.004 ms/op 0.420 ± 0.012 ms/op FULL 1000 5010 avgt5 1.261 ± 0.032 ms/op 1.264 ± 0.026 ms/op FULL 1000 50 100 avgt5 1.291 ± 0.036 ms/op 1.399 ± 0.057 ms/op FULL 1000 50 1000 avgt5 1.225 ± 0.038 ms/op 1.454 ± 0.035 ms/op FULL 5000 510 avgt5 1.090 ± 0.042 ms/op 0.878 ± 0.021 ms/op FULL 5000 5 100 avgt5 1.077 ± 0.045 ms/op 1.023 ± 0.020 ms/op FULL 5000 5 1000 avgt5 1.163 ± 0.016 ms/op 1.100 ± 0.017 ms/op FULL 5000 1010 avgt5 1.371 ± 0.029 ms/op 1.334 ± 0.017 ms/op FULL 5000
[PR] MINOR: Upgrade jetty version [kafka]
frankvicky opened a new pull request, #19680: URL: https://github.com/apache/kafka/pull/19680 When building RC, the current version of Jetty has been reporting for CVE. Hence, we should upgrade the Jetty version to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-19264) Remove fallback for thread pool sizes in RemoteLogManagerConfig
Kuan Po Tseng created KAFKA-19264: - Summary: Remove fallback for thread pool sizes in RemoteLogManagerConfig Key: KAFKA-19264 URL: https://issues.apache.org/jira/browse/KAFKA-19264 Project: Kafka Issue Type: Task Components: Tiered-Storage Reporter: Kuan Po Tseng Assignee: Kuan Po Tseng The fallback mechanism was first introduced in [KIP-950|https://cwiki.apache.org/confluence/x/joqzDw]. According to the proposal, if no thread values are set for {{remote.log.manager.copier.thread.pool.size}} and {{{}remote.log.manager.expiration.thread.pool.size{}}}, these two configs would default to using the value of {{{}remote.log.manager.thread.pool.size{}}}. As quoted from the KIP: {quote}If no thread values are set for the two new configurations presented later on in the document we will default to using the same number of threads in each pool as detailed by remote.log.manager.thread.pool.size. {quote} This fallback behavior was implemented in [https://github.com/apache/kafka/commit/84ab3b9a5c4930f5ae047df088e38c456c7cde54]. However, this approach was abandoned in [KIP-1030|https://cwiki.apache.org/confluence/x/FAqpEQ], where the default values for {{copier}} and {{expiration}} thread pool sizes were changed from {{-1}} to {{{}10{}}}. The related commit can be found in [https://github.com/apache/kafka/commit/3b1bd3812e48d488e4b6b53a9085d6552e8adf02]. Additionally, both {{remote.log.manager.copier.thread.pool.size}} and {{remote.log.manager.expiration.thread.pool.size}} now include a configuration validator that enforces a minimum value of {{{}1{}}}. This means the fallback mechanism should be removed from [{{RemoteLogManagerConfig.java}}|https://issues.apache.org/jira/issues/RemoteLogManagerConfig.java] to align with the new defaults and validation. In short, RemoteLogManagerConfig should apply the following changes: {code:java} public int remoteLogManagerCopierThreadPoolSize() { -int size = config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); -return size == -1 ? remoteLogManagerThreadPoolSize() : size; +return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerExpirationThreadPoolSize() { -int size = config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); -return size == -1 ? remoteLogManagerThreadPoolSize() : size; +return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-19264) Remove fallback for thread pool sizes in RemoteLogManagerConfig
[ https://issues.apache.org/jira/browse/KAFKA-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kuan Po Tseng updated KAFKA-19264: -- Description: The fallback mechanism was first introduced in [KIP-950|https://cwiki.apache.org/confluence/x/joqzDw]. According to the proposal, if no thread values are set for {{remote.log.manager.copier.thread.pool.size}} and {{{}remote.log.manager.expiration.thread.pool.size{}}}, these two configs would default to using the value of {{{}remote.log.manager.thread.pool.size{}}}. As quoted from the KIP: {quote}If no thread values are set for the two new configurations presented later on in the document we will default to using the same number of threads in each pool as detailed by remote.log.manager.thread.pool.size. {quote} This fallback behavior was implemented in [https://github.com/apache/kafka/commit/84ab3b9a5c4930f5ae047df088e38c456c7cde54]. However, this approach was abandoned in [KIP-1030|https://cwiki.apache.org/confluence/x/FAqpEQ], where the default values for {{copier}} and {{expiration}} thread pool sizes were changed from {{-1}} to {{{}10{}}}. The related commit can be found in [https://github.com/apache/kafka/commit/3b1bd3812e48d488e4b6b53a9085d6552e8adf02]. Additionally, both {{remote.log.manager.copier.thread.pool.size}} and {{remote.log.manager.expiration.thread.pool.size}} now include a configuration validator that enforces a minimum value of {{{}1{}}}. This means the fallback mechanism should be removed from [RemoteLogManagerConfig.java|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java] to align with the new defaults and validation. In short, RemoteLogManagerConfig should apply the following changes: {code:java} public int remoteLogManagerCopierThreadPoolSize() { -int size = config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); -return size == -1 ? remoteLogManagerThreadPoolSize() : size; +return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerExpirationThreadPoolSize() { -int size = config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); -return size == -1 ? remoteLogManagerThreadPoolSize() : size; +return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); }{code} was: The fallback mechanism was first introduced in [KIP-950|https://cwiki.apache.org/confluence/x/joqzDw]. According to the proposal, if no thread values are set for {{remote.log.manager.copier.thread.pool.size}} and {{{}remote.log.manager.expiration.thread.pool.size{}}}, these two configs would default to using the value of {{{}remote.log.manager.thread.pool.size{}}}. As quoted from the KIP: {quote}If no thread values are set for the two new configurations presented later on in the document we will default to using the same number of threads in each pool as detailed by remote.log.manager.thread.pool.size. {quote} This fallback behavior was implemented in [https://github.com/apache/kafka/commit/84ab3b9a5c4930f5ae047df088e38c456c7cde54]. However, this approach was abandoned in [KIP-1030|https://cwiki.apache.org/confluence/x/FAqpEQ], where the default values for {{copier}} and {{expiration}} thread pool sizes were changed from {{-1}} to {{{}10{}}}. The related commit can be found in [https://github.com/apache/kafka/commit/3b1bd3812e48d488e4b6b53a9085d6552e8adf02]. Additionally, both {{remote.log.manager.copier.thread.pool.size}} and {{remote.log.manager.expiration.thread.pool.size}} now include a configuration validator that enforces a minimum value of {{{}1{}}}. This means the fallback mechanism should be removed from [{{RemoteLogManagerConfig.java}}|https://issues.apache.org/jira/issues/RemoteLogManagerConfig.java] to align with the new defaults and validation. In short, RemoteLogManagerConfig should apply the following changes: {code:java} public int remoteLogManagerCopierThreadPoolSize() { -int size = config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); -return size == -1 ? remoteLogManagerThreadPoolSize() : size; +return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerExpirationThreadPoolSize() { -int size = config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); -return size == -1 ? remoteLogManagerThreadPoolSize() : size; +return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); }{code} > Remove fallback for thread pool sizes in RemoteLogManagerConfig > --- > > Key: KAFKA-19264 > URL: https://issues.apache.org/jira/browse/KAFKA-19264 > Project: Kafka > Issue Type: Task >
[jira] [Resolved] (KAFKA-19139) Plugin#wrapInstance should use LinkedHashMap instead of Map
[ https://issues.apache.org/jira/browse/KAFKA-19139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TaiJuWu resolved KAFKA-19139. - Resolution: Fixed > Plugin#wrapInstance should use LinkedHashMap instead of Map > > > Key: KAFKA-19139 > URL: https://issues.apache.org/jira/browse/KAFKA-19139 > Project: Kafka > Issue Type: Sub-task >Reporter: 黃竣陽 >Assignee: 黃竣陽 >Priority: Major > > See discussion: > https://github.com/apache/kafka/pull/19050#discussion_r2041133707 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-19264: Remove fallback for thread pool sizes in RemoteLogManagerConfig [kafka]
brandboat opened a new pull request, #19673: URL: https://github.com/apache/kafka/pull/19673 The fallback mechanism for `remote.log.manager.copier.thread.pool.size` and `remote.log.manager.expiration.thread.pool.size` defaulting to `remote.log.manager.thread.pool.size` was introduced in KIP-950. This approach was abandoned in KIP-1030, where default values were changed from -1 to 10, and a configuration validator enforcing a minimum value of 1 was added. As a result, this commit removes the fallback mechanism from `RemoteLogManagerConfig.java` to align with the new defaults and validation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-19265) Use snapshotting of remote log metedata topic that is used when a broker is restarted.
[ https://issues.apache.org/jira/browse/KAFKA-19265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-19265: --- Description: This is an incremental approach to the remote log metadata snapshot mechanism already covered in KIP-405. > Use snapshotting of remote log metedata topic that is used when a broker is > restarted. > -- > > Key: KAFKA-19265 > URL: https://issues.apache.org/jira/browse/KAFKA-19265 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 4.1.0 > > > This is an incremental approach to the remote log metadata snapshot mechanism > already covered in KIP-405. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-19265) Use snapshotting of remote log metedata topic that is used when a broker is restarted.
Satish Duggana created KAFKA-19265: -- Summary: Use snapshotting of remote log metedata topic that is used when a broker is restarted. Key: KAFKA-19265 URL: https://issues.apache.org/jira/browse/KAFKA-19265 Project: Kafka Issue Type: Improvement Components: core Reporter: Satish Duggana Assignee: Kamal Chandraprakash Fix For: 4.1.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13610) Make log.cleaner.enable dynamically configurable
[ https://issues.apache.org/jira/browse/KAFKA-13610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17950756#comment-17950756 ] Chia-Ping Tsai commented on KAFKA-13610: We should change the min value of log.cleaner.threads too, as the value 0 is weird in 5.0 if we remove the log.cleaner.enable. I have sent a mail to the KIP mail. Also, we should add enough warning messages when users configure them. Will file a minor patch to address it > Make log.cleaner.enable dynamically configurable > > > Key: KAFKA-13610 > URL: https://issues.apache.org/jira/browse/KAFKA-13610 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: TengYao Chi >Priority: Major > Labels: kip > Fix For: 5.0.0 > > > It's odd that things like the number of log cleaner threads are configurable, > but not whether it is on or off. We should make log.cleaner.enable > dynamically configurable to close this gap. > Additionally, from a code point of view, we should unconditionally create the > log cleaner object in the LogManager constructor even if we never start it. > This would simplify the code and eliminate many null checks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-19262) Add a test to verify all metrics naming pattern
黃竣陽 created KAFKA-19262: --- Summary: Add a test to verify all metrics naming pattern Key: KAFKA-19262 URL: https://issues.apache.org/jira/browse/KAFKA-19262 Project: Kafka Issue Type: Improvement Reporter: 黃竣陽 Assignee: 黃竣陽 We should add an integration test for KafkaYammerMetrics to ensure all metric names conform to the expected naming convention. see disscussion: [https://lists.apache.org/thread/5wx7b724v9yhqytonbvc3vbyf5fsbsrp] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-19263) The docs of delete.topic.enable used by Admin#deletetopics is out-of-date
[ https://issues.apache.org/jira/browse/KAFKA-19263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Chia Ma reassigned KAFKA-19263: -- Assignee: Yu Chia Ma > The docs of delete.topic.enable used by Admin#deletetopics is out-of-date > -- > > Key: KAFKA-19263 > URL: https://issues.apache.org/jira/browse/KAFKA-19263 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Yu Chia Ma >Priority: Trivial > > * If delete.topic.enable is false on the brokers, deleteTopics will mark > * the topics for deletion, but not actually delete them. The futures will > * return successfully in this case. > It is not true as the server return exception now. > if (!config.deleteTopicEnable) { > if (apiVersion < 3) { > throw new InvalidRequestException("Topic deletion is disabled.") > } else { > throw new TopicDeletionDisabledException() > } > } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-19263) The docs of delete.topic.enable used by Admin#deletetopics is out-of-date
Chia-Ping Tsai created KAFKA-19263: -- Summary: The docs of delete.topic.enable used by Admin#deletetopics is out-of-date Key: KAFKA-19263 URL: https://issues.apache.org/jira/browse/KAFKA-19263 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai * If delete.topic.enable is false on the brokers, deleteTopics will mark * the topics for deletion, but not actually delete them. The futures will * return successfully in this case. It is not true as the server return exception now. if (!config.deleteTopicEnable) { if (apiVersion < 3) { throw new InvalidRequestException("Topic deletion is disabled.") } else { throw new TopicDeletionDisabledException() } } -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19242: Fix commit bugs caused by race condition during rebalancing. [kafka]
chickenchickenlove commented on PR #19631: URL: https://github.com/apache/kafka/pull/19631#issuecomment-2869870166 @dajac Sorry to bother you...! Gently ping 🙇♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-19263: update `delete.topic.enable` docs [kafka]
Mirai1129 opened a new pull request, #19675: URL: https://github.com/apache/kafka/pull/19675 If delete.topic.enable is false on the brokers, deleteTopics will mark the topics for deletion, but not actually delete them. The futures will return successfully in this case. It is not true as the server return exception now. ```java if (!config.deleteTopicEnable) { if (apiVersion < 3) { throw new InvalidRequestException("Topic deletion is disabled.") } else { throw new TopicDeletionDisabledException() } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix unchecked type warnings in several test classes [kafka]
YutaLin opened a new pull request, #19679: URL: https://github.com/apache/kafka/pull/19679 * In ConsoleShareConsumerTest, add @SuppressWarnings("unchecked") annotation in method shouldUpgradeDeliveryCount * In ListConsumerGroupOffsetsHandlerTest, add generic parameters to HashSet constructors * In TopicsImageTest, add explicit generic type to Collections.EMPTY_MAP to fix raw type usage Reviewers: Chia-Ping Tsai [chia7...@gmail.com](mailto:chia7...@gmail.com) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14691; Add TopicId to OffsetFetch API [kafka]
frankvicky commented on code in PR #19515: URL: https://github.com/apache/kafka/pull/19515#discussion_r2083791909 ## clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java: ## @@ -42,17 +43,21 @@ public void testWithMultipleGroups(short version) { .setTopics(List.of( new OffsetFetchRequestData.OffsetFetchRequestTopics() .setName("foo") +.setTopicId(Uuid.randomUuid()) +.setTopicId(Uuid.randomUuid()) Review Comment: Is there an accidental line duplication here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14691; Add TopicId to OffsetFetch API [kafka]
frankvicky commented on code in PR #19515: URL: https://github.com/apache/kafka/pull/19515#discussion_r2083796590 ## clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java: ## @@ -42,17 +43,21 @@ public void testWithMultipleGroups(short version) { .setTopics(List.of( new OffsetFetchRequestData.OffsetFetchRequestTopics() .setName("foo") +.setTopicId(Uuid.randomUuid()) +.setTopicId(Uuid.randomUuid()) Review Comment: There are some tests that topicId is set two times. I'm not sure if I'm not catching contexts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org