Re: [PR] KAFKA-17885: Enable clients to rebootstrap based on timeout or error code (KIP-1102) [kafka]
rajinisivaram commented on code in PR #17720: URL: https://github.com/apache/kafka/pull/17720#discussion_r1835335844 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -1236,6 +1316,12 @@ public void close() { this.metadata.close(); } +private void rebootstrap(long now) { +closeAll(); +metadata.rebootstrap(); +metadataAttemptStartMs = Optional.of(now); Review Comment: It is to handle rebootstrapping again if successful metadata response is not received after one rebootstrap (the second case you mentioned). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17885: Enable clients to rebootstrap based on timeout or error code (KIP-1102) [kafka]
rajinisivaram commented on PR #17720: URL: https://github.com/apache/kafka/pull/17720#issuecomment-2466164414 @apoorvmittal10 Thanks for the review, I have responded to the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17885: Enable clients to rebootstrap based on timeout or error code (KIP-1102) [kafka]
rajinisivaram commented on code in PR #17720: URL: https://github.com/apache/kafka/pull/17720#discussion_r1835335426 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java: ## @@ -289,17 +304,24 @@ public void update(Cluster cluster, long now) { this.state = State.QUIESCENT; this.fatalException = null; +this.metadataAttemptStartMs = Optional.empty(); if (!cluster.nodes().isEmpty()) { this.cluster = cluster; } } +public void initiateRebootstrap() { +requestUpdate(); +this.metadataAttemptStartMs = Optional.of(0L); Review Comment: In rebootstrap(), we set time to `now` as in NetworkClient. In both, when we receive REBOOTSTRAP_REQUIRED error code, we set to 0, so that rebootstrap is triggered on the next poll regardless of time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17885: Enable clients to rebootstrap based on timeout or error code (KIP-1102) [kafka]
rajinisivaram commented on code in PR #17720: URL: https://github.com/apache/kafka/pull/17720#discussion_r1835335638 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -166,11 +169,51 @@ public NetworkClient(Selectable selector, time, discoverBrokerVersions, apiVersions, - null, logContext, + Long.MAX_VALUE, Review Comment: Did you mean the NetworkClient in WorkerGroupMember? This PR changes the constructor used in WorkerGroupMember to pass in the configured value. Let me know if I have missed a different one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17925: Convert Kafka Client integration tests to use KRaft [kafka]
kirktrue commented on code in PR #17670: URL: https://github.com/apache/kafka/pull/17670#discussion_r1835554434 ## core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala: ## @@ -43,8 +43,9 @@ class FetchRequestTestDowngrade extends BaseRequestTest { ) } -@Test -def testTopicIdIsRemovedFromFetcherWhenControllerDowngrades(): Unit = { +@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")) +def testTopicIdIsRemovedFromFetcherWhenControllerDowngrades(quorum: String, groupProtocol: String): Unit = { Review Comment: There are around 20 integration tests that are effectively Zookeeper-only. In some cases, nothing in the test explicitly calls that dependency out. I introduced the `@MethodSource` `getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit()` which includes the test parameters `quorum=zk` and `groupProtocol=classic`. The idea is that someone more knowledgeable about the tests would review these and either a) remove the test, or b) update the test to run against Kraft, if possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17974: Upgrade gradle from 8.10 to 8.10.2 [kafka]
chia7712 commented on code in PR #17734: URL: https://github.com/apache/kafka/pull/17734#discussion_r1835604703 ## gradle/wrapper/gradle-wrapper.properties: ## @@ -1,7 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists distributionSha256Sum=682b4df7fe5accdca84a4d1ef6a3a6ab096b3efd5edf7de2bd8c758d95a93703 Review Comment: please update it to fix build error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17928) Make remote log manager thread-pool configs dynamic
[ https://issues.apache.org/jira/browse/KAFKA-17928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17896991#comment-17896991 ] Kamal Chandraprakash commented on KAFKA-17928: -- [~peterxcli] The KIP got approved. You can assign the ticket to yourself and start to work on this task. > Make remote log manager thread-pool configs dynamic > --- > > Key: KAFKA-17928 > URL: https://issues.apache.org/jira/browse/KAFKA-17928 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Priority: Major > Labels: kip > > The above configs are used to configure the number of threads to > copy/delete/read the data from remote storage. We need those thread-pool > configs to be updated dynamically to adjust them based on-demand. This is > similar to dynamically configuring the request handler threads > ({{{}num.io.threads{}}}): > * {{remote.log.manager.copier.thread.pool.size}} > * {{remote.log.manager.expiration.thread.pool.size}} > * {{remote.log.reader.threads}} and > * {{remote.log.manager.thread.pool.size}} (deprecated in > [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]) > > This ticket requires > [KIP-1105|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1105%3A+Make+remote+log+manager+thread-pool+configs+dynamic] > to be approved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17053: Restructure build.gradle to configure publishing last [kafka]
KTKTK-HZ commented on code in PR #16950: URL: https://github.com/apache/kafka/pull/16950#discussion_r1835609331 ## build.gradle: ## @@ -41,7 +41,7 @@ plugins { id "com.github.spotbugs" version '6.0.25' apply false id 'org.scoverage' version '8.0.3' apply false - id 'io.github.goooler.shadow' version '8.1.3' apply false + id 'io.github.goooler.shadow' version '8.1.8' apply false Review Comment: @chia7712 Yeah, I tried to upgrade Goooler/shadow to GradleUp/shadow, and after the upgrade, the KAFKA-17052 problem did not occur.But as @chia7712 mentioned, This is because the GradleUp/shadow 8.3.0 release does not have the Goooler/shadow#80 patch that caused us to revert versions.Also, unfortunately, GradleUp/shadow does not fix the issue that caused KAFKA-16359.However, since Goooler/shadow is retired now.so I will create a new ticket for changing the fork. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17569: Rewrite TestLinearWriteSpeed by Java [kafka]
frankvicky opened a new pull request, #17736: URL: https://github.com/apache/kafka/pull/17736 JIRA: KAFKA-17569 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17053: Restructure build.gradle to configure publishing last [kafka]
KTKTK-HZ commented on code in PR #16950: URL: https://github.com/apache/kafka/pull/16950#discussion_r1835609331 ## build.gradle: ## @@ -41,7 +41,7 @@ plugins { id "com.github.spotbugs" version '6.0.25' apply false id 'org.scoverage' version '8.0.3' apply false - id 'io.github.goooler.shadow' version '8.1.3' apply false + id 'io.github.goooler.shadow' version '8.1.8' apply false Review Comment: @chia7712 Yeah, I tried to upgrade Goooler/shadow to GradleUp/shadow, and after the upgrade, the KAFKA-17052 problem did not occur.But as @gharris1727 mentioned, This is because the GradleUp/shadow 8.3.0 release does not have the Goooler/shadow#80 patch that caused us to revert versions.Also, unfortunately, GradleUp/shadow does not fix the issue that caused KAFKA-16359.However, since Goooler/shadow is retired now.so I will create a new ticket for changing the fork. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-17928) Make remote log manager thread-pool configs dynamic
[ https://issues.apache.org/jira/browse/KAFKA-17928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Lee reassigned KAFKA-17928: - Assignee: Peter Lee > Make remote log manager thread-pool configs dynamic > --- > > Key: KAFKA-17928 > URL: https://issues.apache.org/jira/browse/KAFKA-17928 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Peter Lee >Priority: Major > Labels: kip > > The above configs are used to configure the number of threads to > copy/delete/read the data from remote storage. We need those thread-pool > configs to be updated dynamically to adjust them based on-demand. This is > similar to dynamically configuring the request handler threads > ({{{}num.io.threads{}}}): > * {{remote.log.manager.copier.thread.pool.size}} > * {{remote.log.manager.expiration.thread.pool.size}} > * {{remote.log.reader.threads}} and > * {{remote.log.manager.thread.pool.size}} (deprecated in > [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]) > > This ticket requires > [KIP-1105|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1105%3A+Make+remote+log+manager+thread-pool+configs+dynamic] > to be approved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-17974: Upgrade gradle from 8.10 to 8.10.2 [kafka]
chiacyu opened a new pull request, #17734: URL: https://github.com/apache/kafka/pull/17734 As titled, please [KAFKA-17974](https://issues.apache.org/jira/browse/KAFKA-17974) for further details. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-17975) Remove ControllerQuorumVotersFutureManager
[ https://issues.apache.org/jira/browse/KAFKA-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-17975: -- Assignee: Kuan Po Tseng (was: Chia-Ping Tsai) > Remove ControllerQuorumVotersFutureManager > -- > > Key: KAFKA-17975 > URL: https://issues.apache.org/jira/browse/KAFKA-17975 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Kuan Po Tseng >Priority: Minor > > `KafkaClusterTestKit` binds the port early, so we don't need to use callback > to get binding port now. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16780: Txn consumer exerts pressure on remote storage when collecting aborted txns [kafka]
kamalcph commented on code in PR #17659: URL: https://github.com/apache/kafka/pull/17659#discussion_r1835568807 ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java: ## @@ -209,4 +209,18 @@ void onPartitionLeadershipChanges(Set leaderPartitions, * @return Total size of the log stored in remote storage in bytes. */ long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException; + +/** + * Returns the next segment that contains the aborted txn entries for the given topic partition, epoch and offset. + * @param topicIdPartition topic partition to search for the next segment. + * @param epoch leader epoch of the txn index. + * @param offset offset + * @return the segment metadata that contains the txn index if exists. Otherwise, returns {@link Optional#empty()}. + * @throws RemoteStorageException if there are any storage related errors occurred. + */ +default Optional nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, Review Comment: Opened #17735 to update the doc in the upgrade notes "other changes" section, since this is not a breaking change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16780: Txn consumer exerts pressure on remote storage when collecting aborted txns [kafka]
kamalcph commented on code in PR #17659: URL: https://github.com/apache/kafka/pull/17659#discussion_r1835568807 ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java: ## @@ -209,4 +209,18 @@ void onPartitionLeadershipChanges(Set leaderPartitions, * @return Total size of the log stored in remote storage in bytes. */ long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException; + +/** + * Returns the next segment that contains the aborted txn entries for the given topic partition, epoch and offset. + * @param topicIdPartition topic partition to search for the next segment. + * @param epoch leader epoch of the txn index. + * @param offset offset + * @return the segment metadata that contains the txn index if exists. Otherwise, returns {@link Optional#empty()}. + * @throws RemoteStorageException if there are any storage related errors occurred. + */ +default Optional nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, Review Comment: Opened #17735 in the upgrade notes "other changes" section, since this is not a breaking change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17923: Remove old kafka version from e2e [kafka]
FrankYang0529 commented on code in PR #17673: URL: https://github.com/apache/kafka/pull/17673#discussion_r1835569444 ## tests/kafkatest/sanity_checks/test_performance_services.py: ## @@ -38,15 +38,9 @@ def setUp(self): self.zk.start() @cluster(num_nodes=5) -# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, -# the overhead should be manageable. -@parametrize(version=str(LATEST_0_8_2), new_consumer=False) -@parametrize(version=str(LATEST_0_9), new_consumer=False) -@parametrize(version=str(LATEST_0_9)) -@parametrize(version=str(LATEST_1_1), new_consumer=False) -@cluster(num_nodes=5) +@parametrize(version=str(LATEST_2_1), new_consumer=False) @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all) Review Comment: If we only apply this change on trunk branch, we also get same error. The reason is that `last` is `None`. ``` last = None for line in node.account.ssh_capture(cmd): last = line # Parse and save the last line's information self.results[idx-1] = self.parse_results(last, node.version) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17975) Remove ControllerQuorumVotersFutureManager
[ https://issues.apache.org/jira/browse/KAFKA-17975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17896984#comment-17896984 ] Kuan Po Tseng commented on KAFKA-17975: --- Gentle ping [~chia7712] , I can help on this one, can I take over this issue ? Thanks ! > Remove ControllerQuorumVotersFutureManager > -- > > Key: KAFKA-17975 > URL: https://issues.apache.org/jira/browse/KAFKA-17975 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > `KafkaClusterTestKit` binds the port early, so we don't need to use callback > to get binding port now. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16780: Document the new RLMM#nextSegmentWithTxnIndex API in upgrade notes [kafka]
kamalcph opened a new pull request, #17735: URL: https://github.com/apache/kafka/pull/17735 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16780: Document the new RLMM#nextSegmentWithTxnIndex API in upgrade notes [kafka]
kamalcph commented on PR #17735: URL: https://github.com/apache/kafka/pull/17735#issuecomment-2466566407 Could you share the instructions to preview the upgrade notes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17923: Remove old kafka version from e2e [kafka]
FrankYang0529 commented on code in PR #17673: URL: https://github.com/apache/kafka/pull/17673#discussion_r1835570270 ## tests/kafkatest/sanity_checks/test_performance_services.py: ## @@ -38,15 +38,9 @@ def setUp(self): self.zk.start() @cluster(num_nodes=5) -# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, -# the overhead should be manageable. -@parametrize(version=str(LATEST_0_8_2), new_consumer=False) -@parametrize(version=str(LATEST_0_9), new_consumer=False) -@parametrize(version=str(LATEST_0_9)) -@parametrize(version=str(LATEST_1_1), new_consumer=False) -@cluster(num_nodes=5) +@parametrize(version=str(LATEST_2_1), new_consumer=False) @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all) Review Comment: @chia7712, I found the root cause. In 2.1.1, we still use broker-list, not bootstrap-server. https://github.com/apache/kafka/blob/21234bee31165527859b46ea48c46b76532f7a37/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L209-L212 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17779: Refactor Flaky RLM Test [kafka]
kamalcph commented on PR #17724: URL: https://github.com/apache/kafka/pull/17724#issuecomment-2466572792 The patch LGTM, will wait for the CI to complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17872: Update consumed offsets on records with invalid timestamp [kafka]
mjsax merged PR #17710: URL: https://github.com/apache/kafka/pull/17710 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17779: Refactor Flaky RLM Test [kafka]
wperlichek commented on PR #17724: URL: https://github.com/apache/kafka/pull/17724#issuecomment-2466602270 > The patch LGTM, will wait for the CI to complete. Thanks for the help. Looks like CI successfully completed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17910: Create integration tests for Admin.listGroups and Admin.describeClassicGroups [kafka]
chia7712 commented on code in PR #17712: URL: https://github.com/apache/kafka/pull/17712#discussion_r1835596925 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -2087,6 +2087,128 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + @ParameterizedTest + @ValueSource(strings = Array("kraft+kip932")) + def testListGroups(quorum: String): Unit = { +val classicGroupId = "classic_group_id" +val consumerGroupId = "consumer_group_id" +val shareGroupId = "share_group_id" +val testTopicName = "test_topic" + +val classicGroupConfig = new Properties(consumerConfig) +classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId) +classicGroupConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name) +val classicGroup = createConsumer(configOverrides = classicGroupConfig) + +val consumerGroupConfig = new Properties(consumerConfig) +consumerGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId) +consumerGroupConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name) +val consumerGroup = createConsumer(configOverrides = consumerGroupConfig) + +val shareGroupConfig = new Properties(consumerConfig) +shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId) +val shareGroup = createShareConsumer(configOverrides = shareGroupConfig) + +val config = createConfig +client = Admin.create(config) +try { + client.createTopics(Collections.singleton( +new NewTopic(testTopicName, 1, 1.toShort) + )).all().get() + waitForTopics(client, List(testTopicName), List()) + + classicGroup.subscribe(Collections.singleton(testTopicName)) + classicGroup.poll(JDuration.ofMillis(1000)) + consumerGroup.subscribe(Collections.singleton(testTopicName)) + consumerGroup.poll(JDuration.ofMillis(1000)) + shareGroup.subscribe(Collections.singleton(testTopicName)) + shareGroup.poll(JDuration.ofMillis(1000)) + + TestUtils.waitUntilTrue(() => { +val groups = client.listGroups().all().get() +groups.size() == 3 + }, "Expected to find all groups") + + val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer") + val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer") + val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share") + + var listGroupsResult = client.listGroups() + assertTrue(listGroupsResult.errors().get().isEmpty) + assertEquals(Set(classicGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.all().get().asScala.toSet) + assertEquals(Set(classicGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.valid().get().asScala.toSet) + + listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(java.util.Set.of(GroupType.CLASSIC))) + assertTrue(listGroupsResult.errors().get().isEmpty) + assertEquals(Set(classicGroupListing), listGroupsResult.all().get().asScala.toSet) + assertEquals(Set(classicGroupListing), listGroupsResult.valid().get().asScala.toSet) + + listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(java.util.Set.of(GroupType.CONSUMER))) + assertTrue(listGroupsResult.errors().get().isEmpty) + assertEquals(Set(consumerGroupListing), listGroupsResult.all().get().asScala.toSet) + assertEquals(Set(consumerGroupListing), listGroupsResult.valid().get().asScala.toSet) + + listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(java.util.Set.of(GroupType.SHARE))) + assertTrue(listGroupsResult.errors().get().isEmpty) + assertEquals(Set(shareGroupListing), listGroupsResult.all().get().asScala.toSet) + assertEquals(Set(shareGroupListing), listGroupsResult.valid().get().asScala.toSet) +} finally { + Utils.closeQuietly(classicGroup, "classicGroup") + Utils.closeQuietly(consumerGroup, "consumerGroup") + Utils.closeQuietly(shareGroup, "shareGroup") + Utils.closeQuietly(client, "adminClient") +} + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testDescribeClassicGroups(quorum: String): Unit = { Review Comment: @FrankYang0529, please use `Admin#alterConsumerGroupOffsets` to set a committed offset. The coordinator will create a classic group with an empty protocol, known as a simple consumer group, for you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17779: Refactor Flaky RLM Test [kafka]
kamalcph merged PR #17724: URL: https://github.com/apache/kafka/pull/17724 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17978) StreamsUpgradeTest#test_rolling_upgrade_with_2_bounces system tests fail
PoAn Yang created KAFKA-17978: - Summary: StreamsUpgradeTest#test_rolling_upgrade_with_2_bounces system tests fail Key: KAFKA-17978 URL: https://issues.apache.org/jira/browse/KAFKA-17978 Project: Kafka Issue Type: Test Reporter: PoAn Yang Assignee: PoAn Yang Run `TC_PATHS="tests/kafkatest/tests/streams/streams_upgrade_test.py" /bin/bash tests/docker/run_tests.sh` on trunk branch. The versions which can support fk_joins can't pass `test_rolling_upgrade_with_2_bounces`. {noformat} [INFO:2024-11-09 22:24:00,601]: Triggering test 10 of 19... [INFO:2024-11-09 22:24:00,611]: RunnerClient: Loading test {'directory': '/opt/kafka-dev/tests/kafkatest/tests/streams', 'file_name': 'streams_upgrade_test.py', 'cls_name': 'StreamsUpgradeTest', 'method_name': 'test_rolling_upgrade_with_2_bounces', 'injected_args': {'from_version': '3.4.1', 'metadata_quorum': 'COMBINED_KRAFT'}} [INFO:2024-11-09 22:24:00,619]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.4.1.metadata_quorum=COMBINED_KRAFT: on run 1/1 [INFO:2024-11-09 22:24:00,621]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.4.1.metadata_quorum=COMBINED_KRAFT: Setting up... [INFO:2024-11-09 22:24:00,623]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.4.1.metadata_quorum=COMBINED_KRAFT: Running... [INFO:2024-11-09 22:26:26,343]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.4.1.metadata_quorum=COMBINED_KRAFT: Tearing down... [INFO:2024-11-09 22:27:47,017]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.4.1.metadata_quorum=COMBINED_KRAFT: FAIL: TimeoutError("Never saw output 'processed [0-9]* records from topic=data' on ducker@ducker07") Traceback (most recent call last): File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 351, in _do_run data = self.run_test() File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 411, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 438, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_upgrade_test.py", line 137, in test_rolling_upgrade_with_2_bounces self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_upgrade_test.py", line 402, in do_stop_start_bounce err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node.account)) File "/usr/local/lib/python3.7/dist-packages/ducktape/cluster/remoteaccount.py", line 754, in wait_until allow_fail=True) == 0, **kwargs) File "/usr/local/lib/python3.7/dist-packages/ducktape/utils/util.py", line 58, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception ducktape.errors.TimeoutError: Never saw output 'processed [0-9]* records from topic=data' on ducker@ducker07 [WARNING:2024-11-09 22:27:47,017]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.4.1.metadata_quorum=COMBINED_KRAFT: Test requested 6 nodes, used only 5 [INFO:2024-11-09 22:27:47,017]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.4.1.metadata_quorum=COMBINED_KRAFT: Data: None [INFO:2024-11-09 22:27:47,124]: ~ [INFO:2024-11-09 22:27:47,125]: Triggering test 11 of 19... [INFO:2024-11-09 22:27:47,134]: RunnerClient: Loading test {'directory': '/opt/kafka-dev/tests/kafkatest/tests/streams', 'file_name': 'streams_upgrade_test.py', 'cls_name': 'StreamsUpgradeTest', 'method_name': 'test_rolling_upgrade_with_2_bounces', 'injected_args': {'from_version': '3.5.2', 'metadata_quorum': 'COMBINED_KRAFT'}} [INFO:2024-11-09 22:27:47,142]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.5.2.metadata_quorum=COMBINED_KRAFT: on run 1/1 [INFO:2024-11-09 22:27:47,144]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.5.2.metadata_quorum=COMBINED_KRAFT: Setting up... [INFO:2024-11-09 22:27:47,146]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_
Re: [PR] KAFKA-17923: Remove old kafka version from e2e [kafka]
FrankYang0529 commented on PR #17673: URL: https://github.com/apache/kafka/pull/17673#issuecomment-2466613195 Hi @chia7712, I think this PR is ready. * For `client_compatibility_features_test.py` and `client_compatibility_produce_consume_test.py` failed cases, versions 2.1.1, 2.2.2, and 2.3.1 will be fixed in https://issues.apache.org/jira/browse/KAFKA-17888. * For `streams_upgrade_test.py` failed cases, they also failed in trunk branch. I have create https://issues.apache.org/jira/browse/KAFKA-17978 for followup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17925: Convert Kafka Client integration tests to use KRaft [kafka]
kirktrue commented on code in PR #17670: URL: https://github.com/apache/kafka/pull/17670#discussion_r1835553656 ## core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala: ## @@ -204,10 +205,19 @@ abstract class QuorumTestHarness extends Logging { TestInfoUtils.isShareGroupTest(testInfo) } - def maybeGroupProtocolSpecified(testInfo: TestInfo): Option[GroupProtocol] = { + def maybeGroupProtocolSpecified(): Option[GroupProtocol] = { TestInfoUtils.maybeGroupProtocolSpecified(testInfo) } + def groupProtocolFromTestParameters() = { +val gp = maybeGroupProtocolSpecified() + +if (gp.isEmpty) + throw new IllegalStateException("Please specify the group.protocol configuration when creating a KafkaConsumer") Review Comment: I updated the error message to `Please specify the "groupProtocol" parameter when writing the test`. I didn't use `groupProtocol=consumer` in the error message because the value of `groupProtocol` can be `classic` or `consumer`. PTAL. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]
kamalcph commented on code in PR #15241: URL: https://github.com/apache/kafka/pull/15241#discussion_r1835574783 ## storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java: ## @@ -238,35 +207,137 @@ private Iterable iterable(Supplier allocate) @Override public boolean hasNext() { try { -return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE; +return txnFile.currentPosition() - position.value >= AbortedTxn.TOTAL_SIZE; } catch (IOException e) { -throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); +throw new KafkaException("Failed read position from the transaction index " + txnFile.path().toAbsolutePath(), e); } } @Override public AbortedTxnWithPosition next() { try { ByteBuffer buffer = allocate.get(); -Utils.readFully(channel, buffer, position.value); +txnFile.read(buffer, position.value); buffer.flip(); AbortedTxn abortedTxn = new AbortedTxn(buffer); if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION) throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version() -+ " in transaction index " + file.getAbsolutePath() + ", current version is " ++ " in transaction index " + txnFile.path().toAbsolutePath() + ", current version is " + AbortedTxn.CURRENT_VERSION); AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); position.value += AbortedTxn.TOTAL_SIZE; return nextEntry; } catch (IOException e) { // We received an unexpected error reading from the index file. We propagate this as an // UNKNOWN error to the consumer, which will cause it to retry the fetch. -throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e); +throw new KafkaException("Failed to read from the transaction index " + txnFile.path().toAbsolutePath(), e); } } }; } +// Visible for testing +static class TransactionIndexFile { +// note that the file is not created until we need it +private volatile Path path; +// channel is reopened as long as there are reads and writes +private FileChannel channel; + +TransactionIndexFile(Path path) throws IOException { +this.path = path; + +if (Files.exists(path)) +openChannel(); +} + +private void openChannel() throws IOException { +channel = FileChannel.open( +path, +StandardOpenOption.CREATE, +StandardOpenOption.READ, +StandardOpenOption.WRITE +); +channel.position(channel.size()); +} + +synchronized void updateParentDir(Path parentDir) { +this.path = parentDir.resolve(path.getFileName()); +} + +synchronized void renameTo(Path other) throws IOException { +try { +if (Files.exists(path)) +Utils.atomicMoveWithFallback(path, other, false); +} finally { +this.path = other; +} +} + +synchronized void flush() throws IOException { +if (channel != null) +channel.force(true); +} + +synchronized void closeChannel() throws IOException { +if (channel != null) +channel.close(); +} + +synchronized boolean isChannelOpen() { +return channel != null && channel.isOpen(); +} + +Path path() { +return path; +} + +synchronized void truncate(long position) throws IOException { +if (channel != null) +channel.truncate(position); +} + +boolean exists() { +return Files.exists(path); +} + +boolean deleteIfExists() throws IOException { +closeChannel(); +return Files.deleteIfExists(path()); +} + +void write(ByteBuffer buffer) throws IOException { +Utils.writeFully(channel(), buffer); +} + +void read(ByteBuffer buffer, int position) throws IOException { Review Comment: can we rename this method to `readFully` as it doesn't read byte-by-byte? -- This is an automated message from the Apache Git Service. To respond to the m
Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]
kamalcph commented on PR #15241: URL: https://github.com/apache/kafka/pull/15241#issuecomment-2466582626 The patch LGTM. It needs second set of eyes as we are re-opening a closed index. cc @showuon @satishd PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17923: Remove old kafka version from e2e [kafka]
FrankYang0529 commented on code in PR #17673: URL: https://github.com/apache/kafka/pull/17673#discussion_r1835576053 ## tests/kafkatest/sanity_checks/test_performance_services.py: ## @@ -38,15 +38,9 @@ def setUp(self): self.zk.start() @cluster(num_nodes=5) -# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, -# the overhead should be manageable. -@parametrize(version=str(LATEST_0_8_2), new_consumer=False) -@parametrize(version=str(LATEST_0_9), new_consumer=False) -@parametrize(version=str(LATEST_0_9)) -@parametrize(version=str(LATEST_1_1), new_consumer=False) -@cluster(num_nodes=5) +@parametrize(version=str(LATEST_2_1), new_consumer=False) @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all) Review Comment: New result: ``` SESSION REPORT (ALL TESTS) ducktape version: 0.12.0 session_id: 2024-11-09--008 run time: 2 minutes 45.572 seconds tests run:6 passed: 6 flaky:0 failed: 0 ignored: 0 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17923: Remove old kafka version from e2e [kafka]
FrankYang0529 commented on code in PR #17673: URL: https://github.com/apache/kafka/pull/17673#discussion_r1835576053 ## tests/kafkatest/sanity_checks/test_performance_services.py: ## @@ -38,15 +38,9 @@ def setUp(self): self.zk.start() @cluster(num_nodes=5) -# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, -# the overhead should be manageable. -@parametrize(version=str(LATEST_0_8_2), new_consumer=False) -@parametrize(version=str(LATEST_0_9), new_consumer=False) -@parametrize(version=str(LATEST_0_9)) -@parametrize(version=str(LATEST_1_1), new_consumer=False) -@cluster(num_nodes=5) +@parametrize(version=str(LATEST_2_1), new_consumer=False) @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all) Review Comment: New result: ``` > TC_PATHS="tests/kafkatest/sanity_checks/test_performance_services.py" /bin/bash tests/docker/run_tests.sh SESSION REPORT (ALL TESTS) ducktape version: 0.12.0 session_id: 2024-11-09--008 run time: 2 minutes 45.572 seconds tests run:6 passed: 6 flaky:0 failed: 0 ignored: 0 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17976) Remove zk-only test: `FetchRequestTestDowngrade`
Chia-Ping Tsai created KAFKA-17976: -- Summary: Remove zk-only test: `FetchRequestTestDowngrade` Key: KAFKA-17976 URL: https://issues.apache.org/jira/browse/KAFKA-17976 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai as title and see https://github.com/apache/kafka/pull/17670#discussion_r1835131970 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17925: Convert Kafka Client integration tests to use KRaft [kafka]
chia7712 commented on code in PR #17670: URL: https://github.com/apache/kafka/pull/17670#discussion_r1835561177 ## core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala: ## @@ -43,8 +43,9 @@ class FetchRequestTestDowngrade extends BaseRequestTest { ) } -@Test -def testTopicIdIsRemovedFromFetcherWhenControllerDowngrades(): Unit = { +@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")) +def testTopicIdIsRemovedFromFetcherWhenControllerDowngrades(quorum: String, groupProtocol: String): Unit = { Review Comment: > The idea is that someone more knowledgeable about the tests would review these and either a) remove the test, or b) update the test to run against Kraft, if possible. Yes, that will be a great reference, similar to #17727. For example, FetchRequestTestDowngrade is for LeaderAndIsrRequest, which is ZK-only, so it's okay to remove it in a follow-up (https://issues.apache.org/jira/browse/KAFKA-17976). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17923: Remove old kafka version from e2e [kafka]
FrankYang0529 commented on code in PR #17673: URL: https://github.com/apache/kafka/pull/17673#discussion_r1835561960 ## tests/kafkatest/services/performance/consumer_performance.py: ## @@ -77,9 +77,6 @@ def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANC assert version.consumer_supports_bootstrap_server() or (not new_consumer), \ "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version) -assert version < V_2_0_0 or new_consumer, \ Review Comment: Hi @chia7712, thanks for the suggestion. I'm curious whether we can assume `new_consumer` is always `true` in `ConsoleConsumer`? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-17976) Remove zk-only test: `FetchRequestTestDowngrade`
[ https://issues.apache.org/jira/browse/KAFKA-17976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Chuan Yu reassigned KAFKA-17976: - Assignee: Chia-Chuan Yu (was: Chia-Ping Tsai) > Remove zk-only test: `FetchRequestTestDowngrade` > - > > Key: KAFKA-17976 > URL: https://issues.apache.org/jira/browse/KAFKA-17976 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Chuan Yu >Priority: Major > > as title and see > https://github.com/apache/kafka/pull/17670#discussion_r1835131970 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-17925) Convert Kafka Client integration tests to use KRaft
[ https://issues.apache.org/jira/browse/KAFKA-17925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17925. Resolution: Fixed > Convert Kafka Client integration tests to use KRaft > --- > > Key: KAFKA-17925 > URL: https://issues.apache.org/jira/browse/KAFKA-17925 > Project: Kafka > Issue Type: Task > Components: clients >Affects Versions: 4.0.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: integration-test > Fix For: 4.0.0 > > > Update pertinent integration tests to use KRaft and not Zookeeper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17925: Convert Kafka Client integration tests to use KRaft [kafka]
chia7712 commented on PR #17670: URL: https://github.com/apache/kafka/pull/17670#issuecomment-2466555239 `SaslPlainPlaintextConsumerTest`, `ConsumerBounceTest`, `TransactionsExpirationTest` they pass on my local -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17872: Update consumed offsets on records with invalid timestamp [kafka]
mjsax commented on PR #17710: URL: https://github.com/apache/kafka/pull/17710#issuecomment-2466555064 Merged to `trunk` and cherry-picked to `3.9`, `3.8`, and `3.7` branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17925: Convert Kafka Client integration tests to use KRaft [kafka]
chia7712 merged PR #17670: URL: https://github.com/apache/kafka/pull/17670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17688 Move TransactionsTest to storage module [kafka]
m1a2st opened a new pull request, #17732: URL: https://github.com/apache/kafka/pull/17732 Jira: https://issues.apache.org/jira/browse/KAFKA-17688 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17801: RemoteLogManager may compute inaccurate upperBoundOffset for aborted txns [kafka]
kamalcph opened a new pull request, #17733: URL: https://github.com/apache/kafka/pull/17733 This is a cherry-pick of #17676 PR ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-17801) RemoteLogManager may compute inaccurate upperBoundOffset for aborted txns
[ https://issues.apache.org/jira/browse/KAFKA-17801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reopened KAFKA-17801: -- > RemoteLogManager may compute inaccurate upperBoundOffset for aborted txns > - > > Key: KAFKA-17801 > URL: https://issues.apache.org/jira/browse/KAFKA-17801 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.0 >Reporter: Jun Rao >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 4.0.0 > > > In RemoteLogManager.read, we compute startPos as the following. > {code:java} > startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset);{code} > This is the position returned by the offset index. The actual position for > the first batch being read happens in the following, but startPos is not > updated accordingly. > {code:java} > firstBatch = findFirstBatch(remoteLogInputStream, offset);{code} > We then use the inaccurate startPos to create fetchDataInfo. > {code:java} > FetchDataInfo fetchDataInfo = new FetchDataInfo( > new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), > startPos), > MemoryRecords.readableRecords(buffer));{code} > In addAbortedTransactions(), we use startPos to find the upperBoundOffset to > retrieve the aborted txns. > {code:java} > long upperBoundOffset = > offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize) > .map(position -> position.offset).orElse(segmentMetadata.endOffset() + > 1);{code} > The inaccurate startPos can lead to inaccurate upperBoundOffset, which leads > to inaccurate aborted txns returned to the consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17923: Remove old kafka version from e2e [kafka]
chia7712 commented on code in PR #17673: URL: https://github.com/apache/kafka/pull/17673#discussion_r1835567141 ## tests/kafkatest/services/performance/consumer_performance.py: ## @@ -77,9 +77,6 @@ def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANC assert version.consumer_supports_bootstrap_server() or (not new_consumer), \ "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version) -assert version < V_2_0_0 or new_consumer, \ Review Comment: yes and we can remove the `new_consumer` as follow-up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17923: Remove old kafka version from e2e [kafka]
chia7712 commented on code in PR #17673: URL: https://github.com/apache/kafka/pull/17673#discussion_r1835567388 ## tests/kafkatest/services/performance/consumer_performance.py: ## @@ -77,9 +77,6 @@ def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANC assert version.consumer_supports_bootstrap_server() or (not new_consumer), \ "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version) -assert version < V_2_0_0 or new_consumer, \ Review Comment: https://issues.apache.org/jira/browse/KAFKA-17977 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17977) Remove new_consumer from E2E
Chia-Ping Tsai created KAFKA-17977: -- Summary: Remove new_consumer from E2E Key: KAFKA-17977 URL: https://issues.apache.org/jira/browse/KAFKA-17977 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: PoAn Yang https://github.com/apache/kafka/pull/17673#discussion_r1834668784 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17923: Remove old kafka version from e2e [kafka]
FrankYang0529 commented on code in PR #17673: URL: https://github.com/apache/kafka/pull/17673#discussion_r1835568028 ## tests/kafkatest/sanity_checks/test_performance_services.py: ## @@ -38,15 +38,9 @@ def setUp(self): self.zk.start() @cluster(num_nodes=5) -# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, -# the overhead should be manageable. -@parametrize(version=str(LATEST_0_8_2), new_consumer=False) -@parametrize(version=str(LATEST_0_9), new_consumer=False) -@parametrize(version=str(LATEST_0_9)) -@parametrize(version=str(LATEST_1_1), new_consumer=False) -@cluster(num_nodes=5) +@parametrize(version=str(LATEST_2_1), new_consumer=False) @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all) Review Comment: I tried to update it, but it looks like `LATEST_2_1` can't pass for this case: ``` SESSION REPORT (ALL TESTS) ducktape version: 0.12.0 session_id: 2024-11-09--005 run time: 3 minutes 6.672 seconds tests run:6 passed: 3 flaky:0 failed: 3 ignored: 0 test_id: kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=2.1.1.metadata_quorum=COMBINED_KRAFT status: FAIL run time: 31.068 seconds TypeError("'NoneType' object is not subscriptable") Traceback (most recent call last): File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 351, in _do_run data = self.run_test() File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 411, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 438, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/sanity_checks/test_performance_services.py", line 80, in test_version consumer_perf_data = compute_aggregate_throughput(self.consumer_perf) File "/opt/kafka-dev/tests/kafkatest/services/performance/performance.py", line 69, in compute_aggregate_throughput aggregate_rate = sum([r['records_per_sec'] for r in perf.results]) File "/opt/kafka-dev/tests/kafkatest/services/performance/performance.py", line 69, in aggregate_rate = sum([r['records_per_sec'] for r in perf.results]) TypeError: 'NoneType' object is not subscriptable test_id: kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=2.1.1.metadata_quorum=ISOLATED_KRAFT status: FAIL run time: 39.534 seconds TypeError("'NoneType' object is not subscriptable") Traceback (most recent call last): File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 351, in _do_run data = self.run_test() File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 411, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 438, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/sanity_checks/test_performance_services.py", line 80, in test_version consumer_perf_data = compute_aggregate_throughput(self.consumer_perf) File "/opt/kafka-dev/tests/kafkatest/services/performance/performance.py", line 69, in compute_aggregate_throughput aggregate_rate = sum([r['records_per_sec'] for r in perf.results]) File "/opt/kafka-dev/tests/kafkatest/services/performance/performance.py", line 69, in aggregate_rate = sum([r['records_per_sec'] for r in perf.results]) TypeError: 'NoneType' object is not subscriptable test_id: kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=2.1.1.metadata_quorum=ZK status: FAIL run time: 34.182 seconds TypeError("'NoneType' object is not subscriptable") Traceback (most recent call last): File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 351, in _do_run data = self.run_test() File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 411, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 438, in wr
Re: [PR] KAFKA-17663: Add metadata caching in PartitionLeaderStrategy [kafka]
chia7712 commented on code in PR #17367: URL: https://github.com/apache/kafka/pull/17367#discussion_r1835288947 ## clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java: ## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.ListOffsetsResponseData; +import org.apache.kafka.common.message.MetadataRequestData; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; Review Comment: this method is removed already, so please use `Set.of` instead ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -407,6 +408,7 @@ public class KafkaAdminClient extends AdminClient { private final ExponentialBackoff retryBackoff; private final boolean clientTelemetryEnabled; private final MetadataRecoveryStrategy metadataRecoveryStrategy; +private final Map partitionLeaderCache; Review Comment: Do we need to manage the memory usage of this cache if it's being used to optimize long-running admin operations? Maybe we could set a size limit, or clean it up when the metadata expires (reusing the existing `metadata.max.age.ms` configuration)? ## clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java: ## @@ -195,4 +199,92 @@ public LookupResult handleResponse( return new LookupResult<>(failed, mapped); } +/** + * This subclass of {@link AdminApiFuture} starts with a pre-fetched map for keys to broker ids which can be + * used to optimise the request. The map is kept up to date as metadata is fetching as this request is processed. + * This is useful for situations in which {@link PartitionLeaderStrategy} is used + * repeatedly, such as a sequence of identical calls to + * {@link org.apache.kafka.clients.admin.Admin#listOffsets(Map, org.apache.kafka.clients.admin.ListOffsetsOptions)}. + */ +public static class PartitionLeaderFuture implements AdminApiFuture { +private final Set requestKeys; +private final Map partitionLeaderCache; +private final Map> futures; + +public PartitionLeaderFuture(Set requestKeys, Map partitionLeaderCache) { +this.requestKeys = requestKeys; +this.partitionLeaderCache = partitionLeaderCache; +this.futures = requestKeys.stream().collect(Collectors.toMap( +Function.identity(), +k -> new KafkaFutureImpl<>() +)); +} + +@Override +public Set lookupKeys() { +return futures.keySet(); +} + +@Override +public Set uncachedLookupKeys() { +Set keys = new HashSet<>(); +requestKeys.forEach(tp
Re: [PR] KAFKA-17970: Moving some share purgatory classes from core to share module [kafka]
chia7712 merged PR #17722: URL: https://github.com/apache/kafka/pull/17722 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17970) Move some purgatory classes from core to share
[ https://issues.apache.org/jira/browse/KAFKA-17970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17970. Fix Version/s: 4.0.0 Resolution: Fixed > Move some purgatory classes from core to share > -- > > Key: KAFKA-17970 > URL: https://issues.apache.org/jira/browse/KAFKA-17970 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Fix For: 4.0.0 > > > As part of PR: [https://github.com/apache/kafka/pull/17636] where purgatory > has been moved from core to server-common hence move some existing classes > used in Share Fetch to Share module. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17975) Remove ControllerQuorumVotersFutureManager
Chia-Ping Tsai created KAFKA-17975: -- Summary: Remove ControllerQuorumVotersFutureManager Key: KAFKA-17975 URL: https://issues.apache.org/jira/browse/KAFKA-17975 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai `KafkaClusterTestKit` binds the port early, so we don't need to use callback to get binding port now. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-17837) Rewrite DeleteTopicTest
[ https://issues.apache.org/jira/browse/KAFKA-17837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17837. Fix Version/s: 4.0.0 Resolution: Fixed > Rewrite DeleteTopicTest > > > Key: KAFKA-17837 > URL: https://issues.apache.org/jira/browse/KAFKA-17837 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Major > Fix For: 4.0.0 > > > 1. remove zk tests > 2. rewrite it by java > 3. use new test infra -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17837: Rewrite DeleteTopicTest [kafka]
chia7712 merged PR #17579: URL: https://github.com/apache/kafka/pull/17579 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Move StopPartition to server-common [kafka]
chia7712 merged PR #17704: URL: https://github.com/apache/kafka/pull/17704 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17923: Remove old kafka version from e2e [kafka]
chia7712 commented on code in PR #17673: URL: https://github.com/apache/kafka/pull/17673#discussion_r1835508337 ## tests/kafkatest/sanity_checks/test_performance_services.py: ## @@ -38,15 +38,9 @@ def setUp(self): self.zk.start() @cluster(num_nodes=5) -# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, -# the overhead should be manageable. -@parametrize(version=str(LATEST_0_8_2), new_consumer=False) -@parametrize(version=str(LATEST_0_9), new_consumer=False) -@parametrize(version=str(LATEST_0_9)) -@parametrize(version=str(LATEST_1_1), new_consumer=False) -@cluster(num_nodes=5) +@parametrize(version=str(LATEST_2_1), new_consumer=False) @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all) Review Comment: Could you please use `@matrix(version=[str(LATEST_2_1), str(DEV_BRANCH)], metadata_quorum=quorum.all)` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15549: Bump swagger dependency version [kafka]
chia7712 merged PR #17730: URL: https://github.com/apache/kafka/pull/17730 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15549) Bump swagger dependency version
[ https://issues.apache.org/jira/browse/KAFKA-15549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-15549. Resolution: Fixed > Bump swagger dependency version > --- > > Key: KAFKA-15549 > URL: https://issues.apache.org/jira/browse/KAFKA-15549 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Jhen-Yung Hsu >Priority: Major > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Move StopPartition to server-common [kafka]
mimaison commented on PR #17704: URL: https://github.com/apache/kafka/pull/17704#issuecomment-2466439939 Thanks for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17925: Convert Kafka Client integration tests to use KRaft [kafka]
chia7712 commented on PR #17670: URL: https://github.com/apache/kafka/pull/17670#issuecomment-2466441612 @kirktrue Could you please fix the conflicts and comment (https://github.com/apache/kafka/pull/17670#discussion_r1835100820)? I'd like to merge this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17053: Restructure build.gradle to configure publishing last [kafka]
chia7712 commented on code in PR #16950: URL: https://github.com/apache/kafka/pull/16950#discussion_r1835503294 ## build.gradle: ## @@ -41,7 +41,7 @@ plugins { id "com.github.spotbugs" version '6.0.25' apply false id 'org.scoverage' version '8.0.3' apply false - id 'io.github.goooler.shadow' version '8.1.3' apply false + id 'io.github.goooler.shadow' version '8.1.8' apply false Review Comment: Could you try the successor (https://github.com/GradleUp/shadow)? It may fix the issue like https://issues.apache.org/jira/browse/KAFKA-17052 and https://issues.apache.org/jira/browse/KAFKA-16359 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17615: Remove KafkaServer references in tests [kafka]
mimaison commented on PR #17365: URL: https://github.com/apache/kafka/pull/17365#issuecomment-2466454973 @cmccabe Can you rebase to resolve the conflicts? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17787: Removed --zookeeper option and logic from ConfigCommand [kafka]
mimaison commented on code in PR #17507: URL: https://github.com/apache/kafka/pull/17507#discussion_r1835514323 ## core/src/test/java/kafka/admin/ConfigCommandTest.java: ## @@ -508,100 +426,56 @@ public void testExpectedEntityTypeNames(List expectedTypes, List assertEquals(createOpts.entityTypes().toSeq(), seq(expectedTypes)); assertEquals(createOpts.entityNames().toSeq(), seq(expectedNames)); } - -public void doTestOptionEntityTypeNames(boolean zkConfig) { -List connectOpts = zkConfig -? Arrays.asList("--zookeeper", ZK_CONNECT) -: Arrays.asList("--bootstrap-server", "localhost:9092"); - -// zookeeper config only supports "users" and "brokers" entity type -if (!zkConfig) { - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC), Collections.singletonList("A"), connectOpts, "--entity-type", "topics", "--entity-name", "A"); - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP), Collections.singletonList("1.2.3.4"), connectOpts, "--entity-name", "1.2.3.4", "--entity-type", "ips"); - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.CLIENT_METRICS), Collections.singletonList("A"), connectOpts, "--entity-type", "client-metrics", "--entity-name", "A"); - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP), Collections.singletonList("A"), connectOpts, "--entity-type", "groups", "--entity-name", "A"); -testExpectedEntityTypeNames(Arrays.asList(ConfigType.USER, ConfigType.CLIENT), Arrays.asList("A", ""), connectOpts, -"--entity-type", "users", "--entity-type", "clients", "--entity-name", "A", "--entity-default"); -testExpectedEntityTypeNames(Arrays.asList(ConfigType.USER, ConfigType.CLIENT), Arrays.asList("", "B"), connectOpts, -"--entity-default", "--entity-name", "B", "--entity-type", "users", "--entity-type", "clients"); - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC), Collections.singletonList("A"), connectOpts, "--topic", "A"); - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP), Collections.singletonList("1.2.3.4"), connectOpts, "--ip", "1.2.3.4"); - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP), Collections.singletonList("A"), connectOpts, "--group", "A"); -testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT, ConfigType.USER), Arrays.asList("B", "A"), connectOpts, "--client", "B", "--user", "A"); -testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT, ConfigType.USER), Arrays.asList("B", ""), connectOpts, "--client", "B", "--user-defaults"); -testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT, ConfigType.USER), Collections.singletonList("A"), connectOpts, -"--entity-type", "clients", "--entity-type", "users", "--entity-name", "A"); - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC), Collections.emptyList(), connectOpts, "--entity-type", "topics"); - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP), Collections.emptyList(), connectOpts, "--entity-type", "ips"); - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP), Collections.emptyList(), connectOpts, "--entity-type", "groups"); - testExpectedEntityTypeNames(Collections.singletonList(ConfigType.CLIENT_METRICS), Collections.emptyList(), connectOpts, "--entity-type", "client-metrics"); -} - +@Test Review Comment: Nit: can we add a newline between both tests? ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -952,19 +644,11 @@ object ConfigCommand extends Logging { val hasEntityDefault = entityNames.exists(_.isEmpty) val numConnectOptions = (if (options.has(bootstrapServerOpt)) 1 else 0) + -(if (options.has(bootstrapControllerOpt)) 1 else 0) + -(if (options.has(zkConnectOpt)) 1 else 0) +(if (options.has(bootstrapControllerOpt)) 1 else 0) if (numConnectOptions == 0) -throw new IllegalArgumentException("One of the required --bootstrap-server, --boostrap-controller, or --zookeeper arguments must be specified") +throw new IllegalArgumentException("One of the required --bootstrap-server or --bootstrap-controller arguments must be specified") Review Comment: I think this line is not reachable anymore since we now throw on line 627 is both --bootstrap-server and --bootstrap-controller not are present -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infr
[PR] KAFKA-17601: Inter-broker connections do not expose their clientSoftwareName and clientSoftwareVersion tags [kafka]
FrankYang0529 opened a new pull request, #17731: URL: https://github.com/apache/kafka/pull/17731 When Kafka brokers are connecting to other brokers this information is not properly populated, we see the "unknown" value instead for both `ClientSoftwareName` and `ClientSoftwareVersion`. The reason is that we updated `ClientInformation` in `ChannelMetadataRegistry` after we built `RequestConext`. We should initialize `ClientInformation` before setup `RequestContext`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Delete unused member from KafkaAdminClient [kafka]
chia7712 merged PR #17729: URL: https://github.com/apache/kafka/pull/17729 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org