Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
mingdaoy commented on PR #19065: URL: https://github.com/apache/kafka/pull/19065#issuecomment-2694720187 @chia7712 @dajac  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18867: add tests to describe topic configs with empty name [kafka]
Rancho-7 commented on code in PR #19075: URL: https://github.com/apache/kafka/pull/19075#discussion_r1977580342 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -1626,6 +1626,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic) client.describeConfigs(Collections.singletonList(existingTopic)).values.get(existingTopic).get() +val defaultTopic = new ConfigResource(ConfigResource.Type.TOPIC, "") +val describeResult0 = client.describeConfigs(Collections.singletonList(defaultTopic)) + +assertTrue(assertThrows(classOf[ExecutionException], () => describeResult0.values.get(defaultTopic).get).getCause.isInstanceOf[InvalidTopicException]) Review Comment: Thanks for the review! Fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18867: add tests to describe topic configs with empty name [kafka]
Rancho-7 commented on code in PR #19075: URL: https://github.com/apache/kafka/pull/19075#discussion_r1977584859 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -1626,6 +1626,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic) client.describeConfigs(Collections.singletonList(existingTopic)).values.get(existingTopic).get() +val defaultTopic = new ConfigResource(ConfigResource.Type.TOPIC, "") +val describeResult0 = client.describeConfigs(Collections.singletonList(defaultTopic)) Review Comment: Thanks for the suggestion! I agree that would make the code cleaner. Fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18713: Fix left join bug by using DELETE_KEY_NO_PROPAGATE [kafka]
patronovski commented on PR #18887: URL: https://github.com/apache/kafka/pull/18887#issuecomment-2694740863 Any updates on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
dajac commented on code in PR #19065: URL: https://github.com/apache/kafka/pull/19065#discussion_r1977703481 ## docs/ops.html: ## @@ -1343,7 +1343,7 @@ Check and wait until the Lag is small for a majority of the controllers. If the leader's end offset is not increasing, you can wait until the lag is 0 for a majority; otherwise, you can pick the latest leader end offset and wait until all replicas have reached it. Check and wait until the LastFetchTimestamp and LastCaughtUpTimestamp are close to each other for the majority of the controllers. At this point it is safer to format the controller's metadata log directory. This can be done by running the kafka-storage.sh command. - $ bin/kafka-storage.sh format --cluster-id uuid --config server_properties + $ bin/kafka-storage.sh format --cluster-id uuid --config config/server_properties Review Comment: Should it be `config/server.properties`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18713: Fix left join bug by using DELETE_KEY_NO_PROPAGATE [kafka]
nilmadhab commented on PR #18887: URL: https://github.com/apache/kafka/pull/18887#issuecomment-2694913447 > Any updates on this? https://github.com/apache/kafka/pull/19005 I have created a new PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
dejan2609 commented on code in PR #18018: URL: https://github.com/apache/kafka/pull/18018#discussion_r1977235728 ## build.gradle: ## @@ -1916,6 +1913,10 @@ project(':clients') { generator project(':generator') } + tasks.withType(GenerateModuleMetadata) { +enabled = false Review Comment: Just to sum it up: @poom-kitti (and mine) intention is to have exact behavior for `kafka-clients` dependency (in external projects, that is) and hence I opted to honor his findings and to [disable gradle module metadata publication](https://docs.gradle.org/8.10.2/userguide/publishing_gradle_module_metadata.html#sub:disabling-gmm-publication) for `project(':clients')` (i.e. for `kafka-clients` artifacts) All in all: we ended up with a situation where we are trying to have all these three things at once: - switch shadow plugin (i.e. use GradleUp/shadow plugin) - have exactly the same dependency tree in external projects (with `kafka-clients` dependency) - publish gradle module metadata for `project(':clients` (`kafka-clients` artifacts) I dare to assume that this could be addressed on https://github.com/GradleUp/shadow side, maybe @Goooler can provide at least some insights. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
dejan2609 commented on code in PR #18018: URL: https://github.com/apache/kafka/pull/18018#discussion_r1977235728 ## build.gradle: ## @@ -1916,6 +1913,10 @@ project(':clients') { generator project(':generator') } + tasks.withType(GenerateModuleMetadata) { +enabled = false Review Comment: Just to sum it up: @poom-kitti (and mine) intention is to have exact behavior for `kafka-clients` dependency (in external projects, that is) and hence I opted to honor his findings and to [disable gradle module metadata publication](https://docs.gradle.org/8.10.2/userguide/publishing_gradle_module_metadata.html#sub:disabling-gmm-publication) for `project(':clients')` (i.e. for `kafka-clients` artifacts) All in all: we ended up with a situation where we are trying to have all these three things at once: - switch shadow plugin (i.e. us GradleUp/shadow plugin) - have exactly the same dependency tree in external projects (with `kafka-clients` dependency) - publish gradle module metadata for `project(':clients' (`kafka-clients` artifacts) I dare to assume that this could be addressed on https://github.com/GradleUp/shadow side, maybe @Goooler can provide at least some insights. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
dejan2609 commented on code in PR #18018: URL: https://github.com/apache/kafka/pull/18018#discussion_r1977235728 ## build.gradle: ## @@ -1916,6 +1913,10 @@ project(':clients') { generator project(':generator') } + tasks.withType(GenerateModuleMetadata) { +enabled = false Review Comment: Just to sum it up: @poom-kitti (and mine) intention is to have exact behavior for `kafka-clients` dependency (in external projects, that is) and hence I opted to honor his findings and to [disable gradle module metadata publication](https://docs.gradle.org/8.10.2/userguide/publishing_gradle_module_metadata.html#sub:disabling-gmm-publication) for `project(':clients')` (i.e. for `kafka-clients` artifacts). All in all: we ended up with a situation where we are trying to have all these three things at once: - switch shadow plugin (i.e. use GradleUp/shadow plugin) - have exactly the same dependency tree in external projects (with `kafka-clients` dependency) - publish gradle module metadata for `project(':clients` (`kafka-clients` artifacts) I dare to assume that this could be addressed on https://github.com/GradleUp/shadow side, maybe @Goooler can provide at least some insights. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18067: Kafka Streams can leak Producer client under EOS [kafka]
mjsax commented on PR #17931: URL: https://github.com/apache/kafka/pull/17931#issuecomment-2694932880 This PR introduced a regression for EOS, and was reverted for `4.0.0` release. https://github.com/apache/kafka/pull/19078 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Revert "KAFKA-18067: Kafka Streams can leak Producer client under EOS (#17931)" [kafka]
mjsax commented on PR #19078: URL: https://github.com/apache/kafka/pull/19078#issuecomment-2694935812 This was merged to `trunk` and cherry-picked to `4.0` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
dejan2609 commented on code in PR #18018: URL: https://github.com/apache/kafka/pull/18018#discussion_r1977235728 ## build.gradle: ## @@ -1916,6 +1913,10 @@ project(':clients') { generator project(':generator') } + tasks.withType(GenerateModuleMetadata) { +enabled = false Review Comment: Just to sum it up: @poom-kitti (and mine) intention is to have exact behavior for `kafka-clients` dependency (in external projects, that is) and hence I opted to honor his findings and to [disable gradle module metadata publication](https://docs.gradle.org/8.10.2/userguide/publishing_gradle_module_metadata.html#sub:disabling-gmm-publication) for `project(':clients')` (i.e. for `kafka-clients` artifacts) All in all: we ended up with a situation where we are trying to have all these three things at once: - switch shadow plugin (i.e. use GradleUp/shadow plugin) - have exactly the same dependency tree in external projects (with `kafka-clients` dependency) - publish gradle module metadata for `project(':clients' (`kafka-clients` artifacts) I dare to assume that this could be addressed on https://github.com/GradleUp/shadow side, maybe @Goooler can provide at least some insights. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-18909: Move DynamicThreadPool to server module [kafka]
clarkwtc opened a new pull request, #19081: URL: https://github.com/apache/kafka/pull/19081 * Add `DynamicThreadPool.java` to the server module. * Remove the old DynamicThreadPool object in the `DynamicBrokerConfig.scala`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18878: Added share session cache and delayed share fetch metrics (KIP-1103) [kafka]
AndrewJSchofield merged PR #19059: URL: https://github.com/apache/kafka/pull/19059 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14121: AlterPartitionReassignments API should allow callers to specify the option of preserving the replication factor [kafka]
clolov commented on PR #18983: URL: https://github.com/apache/kafka/pull/18983#issuecomment-2694686895 Heya, thanks for the updated code! I will have a look at it tomorrow! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18461: Fix potential NPE in setDelta after map is erased [kafka]
mumrah commented on code in PR #18684: URL: https://github.com/apache/kafka/pull/18684#discussion_r1977651214 ## server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java: ## @@ -47,7 +47,9 @@ T getDelta(Revertable owner) { } void setDelta(Revertable owner, Delta delta) { -map.put(owner, delta); +if (map != null) { +map.put(owner, delta); +} Review Comment: Sorry, my comment "sanity checks" was probably a bit too vague. > However, since we can't guarantee that the caller properly disposes of the object and does not make any further calls which touch map, it would be good to include some sanity checks. We should not silently ignore a null `map`. That would be a violation of the implied contract here which is that a Snapshot cannot be used again after `erase` is called. What we can do is add an explicit not-null assertion with `Objects.requireNonNull`. This will still throw an NPE, but it makes the code more explicit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14121: AlterPartitionReassignments API should allow callers to specify the option of preserving the replication factor [kafka]
brandboat commented on PR #18983: URL: https://github.com/apache/kafka/pull/18983#issuecomment-2694696867 > Heya, thanks for the updated code! I will have a look at it tomorrow! Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
mingdaoy commented on code in PR #19065: URL: https://github.com/apache/kafka/pull/19065#discussion_r1977695832 ## docs/ops.html: ## @@ -4187,7 +4187,7 @@ org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor interface and specifying the full class name in the configuration. Review Comment: That would look like the original - "no line breaks" ``` implementing the org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor interface and specifying ```  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
mingdaoy commented on code in PR #19065: URL: https://github.com/apache/kafka/pull/19065#discussion_r1977695832 ## docs/ops.html: ## @@ -4187,7 +4187,7 @@ org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor interface and specifying the full class name in the configuration. Review Comment: That would look like the original - "no line breaks" which is cut-off ``` implementing the org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor interface and specifying ```  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14121: AlterPartitionReassignments API should allow callers to specify the option of preserving the replication factor [kafka]
frankvicky commented on code in PR #18983: URL: https://github.com/apache/kafka/pull/18983#discussion_r1977696277 ## metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java: ## @@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) { assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE)); } +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) +public void testAlterPartitionDisallowReplicationFactorChange(short version) { +MetadataVersion metadataVersion = MetadataVersion.latestTesting(); +ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() +.setMetadataVersion(metadataVersion) +.build(); +ReplicationControlManager replication = ctx.replicationControl; +ctx.registerBrokers(0, 1, 2, 3); +ctx.unfenceBrokers(0, 1, 2, 3); +ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}, new int[] {0, 1, 2}, new int[] {0, 1, 2}}); + +ControllerResult alterResult = +replication.alterPartitionReassignments( +new AlterPartitionReassignmentsRequestData().setTopics(singletonList( +new ReassignableTopic().setName("foo").setPartitions(asList( +new ReassignablePartition().setPartitionIndex(0). +setReplicas(asList(1, 2, 3)), Review Comment: nit: `List.of` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
mingdaoy commented on code in PR #19065: URL: https://github.com/apache/kafka/pull/19065#discussion_r1977695832 ## docs/ops.html: ## @@ -4187,7 +4187,7 @@ org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor interface and specifying the full class name in the configuration. Review Comment: That would look like the original - "no line breaks", which is cut-off ``` implementing the org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor interface and specifying ```  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18617 Allow use of ClusterInstance inside BeforeEach [kafka]
mumrah commented on code in PR #18662: URL: https://github.com/apache/kafka/pull/18662#discussion_r1977721741 ## test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/README.md: ## @@ -104,7 +104,7 @@ For each generated invocation: * Non-static `@AfterEach` methods are called * Static `@AfterAll` methods are called -`@BeforeEach` methods give an opportunity to setup additional test dependencies before the cluster is started. Review Comment: I rewrote a lot of this readme since it was kind of outdated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14121: AlterPartitionReassignments API should allow callers to specify the option of preserving the replication factor [kafka]
frankvicky commented on code in PR #18983: URL: https://github.com/apache/kafka/pull/18983#discussion_r1977728460 ## metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java: ## @@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) { assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE)); } +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) +public void testAlterPartitionDisallowReplicationFactorChange(short version) { +MetadataVersion metadataVersion = MetadataVersion.latestTesting(); +ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() +.setMetadataVersion(metadataVersion) +.build(); +ReplicationControlManager replication = ctx.replicationControl; +ctx.registerBrokers(0, 1, 2, 3); +ctx.unfenceBrokers(0, 1, 2, 3); +ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}, new int[] {0, 1, 2}, new int[] {0, 1, 2}}); + +ControllerResult alterResult = +replication.alterPartitionReassignments( +new AlterPartitionReassignmentsRequestData().setTopics(singletonList( +new ReassignableTopic().setName("foo").setPartitions(asList( +new ReassignablePartition().setPartitionIndex(0). +setReplicas(asList(1, 2, 3)), Review Comment: Make sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
mingdaoy commented on code in PR #19065: URL: https://github.com/apache/kafka/pull/19065#discussion_r1977729557 ## docs/ops.html: ## @@ -1343,13 +1343,13 @@ Check and wait until the Lag is small for a majority of the controllers. If the leader's end offset is not increasing, you can wait until the lag is 0 for a majority; otherwise, you can pick the latest leader end offset and wait until all replicas have reached it. Check and wait until the LastFetchTimestamp and LastCaughtUpTimestamp are close to each other for the majority of the controllers. At this point it is safer to format the controller's metadata log directory. This can be done by running the kafka-storage.sh command. - $ bin/kafka-storage.sh format --cluster-id uuid --config server_properties + $ bin/kafka-storage.sh format --cluster-id uuid --config config/server.properties It is possible for the bin/kafka-storage.sh format command above to fail with a message like Log directory ... is already formatted. This can happen when combined mode is used and only the metadata log directory was lost but not the others. In that case and only in that case, can you run the bin/kafka-storage.sh format command with the --ignore-formatted option. Start the KRaft controller after formatting the log directories. - $ bin/kafka-server-start.sh server_properties + $ bin/kafka-server-start.sh server.properties Review Comment: @dajac done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14121: AlterPartitionReassignments API should allow callers to specify the option of preserving the replication factor [kafka]
brandboat commented on code in PR #18983: URL: https://github.com/apache/kafka/pull/18983#discussion_r1977713877 ## metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java: ## @@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) { assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE)); } +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) +public void testAlterPartitionDisallowReplicationFactorChange(short version) { +MetadataVersion metadataVersion = MetadataVersion.latestTesting(); +ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() +.setMetadataVersion(metadataVersion) +.build(); +ReplicationControlManager replication = ctx.replicationControl; +ctx.registerBrokers(0, 1, 2, 3); +ctx.unfenceBrokers(0, 1, 2, 3); +ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}, new int[] {0, 1, 2}, new int[] {0, 1, 2}}); + +ControllerResult alterResult = +replication.alterPartitionReassignments( +new AlterPartitionReassignmentsRequestData().setTopics(singletonList( +new ReassignableTopic().setName("foo").setPartitions(asList( +new ReassignablePartition().setPartitionIndex(0). +setReplicas(asList(1, 2, 3)), Review Comment: I've given this some thought, and I agree with you that List.of is a better choice. However, since this test class consistently uses Arrays.asList, perhaps we can open a separate minor PR to address this improvement. As you suggested, we can replace asList with List.of in this test class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18871) KRaft migration rollback causes downtime
[ https://issues.apache.org/jira/browse/KAFKA-18871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-18871: - Attachment: controller_logs.zip kraft-rollback-kafka-default-pool-1.zip kraft-rollback-kafka-default-pool-2.zip strimzi-cluster-operator-7bc47d488f-p4ltv.zip > KRaft migration rollback causes downtime > > > Key: KAFKA-18871 > URL: https://issues.apache.org/jira/browse/KAFKA-18871 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.9.0 >Reporter: Daniel Urban >Priority: Critical > Attachments: cluster-operator.log.zip, controller_logs.zip, > kraft-rollback-bug.zip, kraft-rollback-kafka-default-pool-1.zip, > kraft-rollback-kafka-default-pool-2.zip, > strimzi-cluster-operator-7bc47d488f-p4ltv.zip > > > When testing the KRaft migration rollback feature, found the following > scenario: > # Execute KRaft migration on a 3 broker 3 ZK node cluster to the last step, > but do not finalize the migration. > ## In the test, we put a slow but continuous produce+consume load on the > cluster, with a topic (partitions=3, RF=3, min ISR=2) > # Start the rollback procedure > # First we roll back the brokers from KRaft mode to migration mode (both > controller and ZK configs are set, process roles are removed, > {{zookeeper.metadata.migration.enable}} is true) > # Then we delete the KRaft controllers, delete the /controller znode > # Then we immediately start rolling the brokers 1 by 1 to ZK mode by > removing the {{zookeeper.metadata.migration.enable}} flag and the > controller.* configurations. > # At this point, when we restart the 1st broker (let's call it broker 0) > into ZK mode, we find an issue which occurs ~1 out of 20 times: > If broker 0 is not in the ISR for one of the partitions at this point, it can > simply never become part of the ISR. Since we are aiming for zero downtime, > we check the ISR states of partitions between broker restarts, and our > process gets blocked at this point. We have tried multiple workarounds at > this point, but it seems that there is no workaround which still ensures zero > downtime. > Some more details about the process: > * We are using Strimzi to drive this process, but have verified that Strimzi > follows the documented steps precisely. > * When we reach the error state, it doesn't matter which broker became the > controller through the ZK node, the brokers still in migration mode get > stuck, and they flood the logs with the following error: > {code:java} > 2025-02-26 10:55:21,985 WARN [RaftManager id=0] Error connecting to node > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local:9090 > (id: 5 rack: null) (org.apache.kafka.clients.NetworkClient) > [kafka-raft-outbound-request-thread] > java.net.UnknownHostException: > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local > at > java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801) > at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1533) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1385) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306) > at > org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:125) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.resolveAddresses(ClusterConnectionStates.java:536) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466) > at > org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1075) > at > org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:321) > at > org.apache.kafka.server.util.InterBrokerSendThread.sendRequests(InterBrokerSendThread.java:146) > at > org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:109) > at > org.apache.kafka.server.util.InterBrokerSendThread.doWork(InterBrokerSendThread.java:137) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136) > {code} > * Manually verified the last offsets of the replicas, and broker 0 is caught > up in the partition. > *
[jira] [Updated] (KAFKA-18871) KRaft migration rollback causes downtime
[ https://issues.apache.org/jira/browse/KAFKA-18871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-18871: - Attachment: kraft-rollback-kafka-default-pool-0.zip > KRaft migration rollback causes downtime > > > Key: KAFKA-18871 > URL: https://issues.apache.org/jira/browse/KAFKA-18871 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.9.0 >Reporter: Daniel Urban >Priority: Critical > Attachments: cluster-operator.log.zip, controller_logs.zip, > kraft-rollback-bug.zip, kraft-rollback-kafka-default-pool-0.zip, > kraft-rollback-kafka-default-pool-1.zip, > kraft-rollback-kafka-default-pool-2.zip, > strimzi-cluster-operator-7bc47d488f-p4ltv.zip > > > When testing the KRaft migration rollback feature, found the following > scenario: > # Execute KRaft migration on a 3 broker 3 ZK node cluster to the last step, > but do not finalize the migration. > ## In the test, we put a slow but continuous produce+consume load on the > cluster, with a topic (partitions=3, RF=3, min ISR=2) > # Start the rollback procedure > # First we roll back the brokers from KRaft mode to migration mode (both > controller and ZK configs are set, process roles are removed, > {{zookeeper.metadata.migration.enable}} is true) > # Then we delete the KRaft controllers, delete the /controller znode > # Then we immediately start rolling the brokers 1 by 1 to ZK mode by > removing the {{zookeeper.metadata.migration.enable}} flag and the > controller.* configurations. > # At this point, when we restart the 1st broker (let's call it broker 0) > into ZK mode, we find an issue which occurs ~1 out of 20 times: > If broker 0 is not in the ISR for one of the partitions at this point, it can > simply never become part of the ISR. Since we are aiming for zero downtime, > we check the ISR states of partitions between broker restarts, and our > process gets blocked at this point. We have tried multiple workarounds at > this point, but it seems that there is no workaround which still ensures zero > downtime. > Some more details about the process: > * We are using Strimzi to drive this process, but have verified that Strimzi > follows the documented steps precisely. > * When we reach the error state, it doesn't matter which broker became the > controller through the ZK node, the brokers still in migration mode get > stuck, and they flood the logs with the following error: > {code:java} > 2025-02-26 10:55:21,985 WARN [RaftManager id=0] Error connecting to node > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local:9090 > (id: 5 rack: null) (org.apache.kafka.clients.NetworkClient) > [kafka-raft-outbound-request-thread] > java.net.UnknownHostException: > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local > at > java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801) > at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1533) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1385) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306) > at > org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:125) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.resolveAddresses(ClusterConnectionStates.java:536) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466) > at > org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1075) > at > org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:321) > at > org.apache.kafka.server.util.InterBrokerSendThread.sendRequests(InterBrokerSendThread.java:146) > at > org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:109) > at > org.apache.kafka.server.util.InterBrokerSendThread.doWork(InterBrokerSendThread.java:137) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136) > {code} > * Manually verified the last offsets of the replicas, and broker 0 is caught > up in the partition. > * Even after stopping the produce load, the issue persists. > * Even after removing the /controller node manua
[jira] [Commented] (KAFKA-18871) KRaft migration rollback causes downtime
[ https://issues.apache.org/jira/browse/KAFKA-18871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932017#comment-17932017 ] Daniel Urban commented on KAFKA-18871: -- [~showuon] Managed to reproduce with debug logs, attached new files. If checking the operator logs, look for the csm-op-test-kraft-rollback-89a41d95 namespace. >From the logs you were looking for, found some like this: {code:java} 2025-03-03 15:28:55,343 INFO [Partition kraft-test-topic-1 broker=0] Failed to alter partition to PendingExpandIsr(newInSyncReplicaId=1, sentLeaderAndIsr=LeaderAndIsr(leader=0, leaderEpoch=4, isrWithBrokerEpoch=List(BrokerState(brokerId=2, brokerEpoch=-1), BrokerState(brokerId=0, brokerEpoch=-1), BrokerState(brokerId=1, brokerEpoch=-1)), leaderRecoveryState=RECOVERED, partitionEpoch=7), leaderRecoveryState=RECOVERED, lastCommittedState=CommittedPartitionState(isr=Set(2, 0), leaderRecoveryState=RECOVERED)) since the controller rejected the request with INELIGIBLE_REPLICA. Partition state has been reset to the latest committed state CommittedPartitionState(isr=Set(2, 0), leaderRecoveryState=RECOVERED). (kafka.cluster.Partition) [zk-broker-0-to-controller-alter-partition-channel-manager] {code} > KRaft migration rollback causes downtime > > > Key: KAFKA-18871 > URL: https://issues.apache.org/jira/browse/KAFKA-18871 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.9.0 >Reporter: Daniel Urban >Priority: Critical > Attachments: cluster-operator.log.zip, controller_logs.zip, > kraft-rollback-bug.zip, kraft-rollback-kafka-default-pool-0.zip, > kraft-rollback-kafka-default-pool-1.zip, > kraft-rollback-kafka-default-pool-2.zip, > strimzi-cluster-operator-7bc47d488f-p4ltv.zip > > > When testing the KRaft migration rollback feature, found the following > scenario: > # Execute KRaft migration on a 3 broker 3 ZK node cluster to the last step, > but do not finalize the migration. > ## In the test, we put a slow but continuous produce+consume load on the > cluster, with a topic (partitions=3, RF=3, min ISR=2) > # Start the rollback procedure > # First we roll back the brokers from KRaft mode to migration mode (both > controller and ZK configs are set, process roles are removed, > {{zookeeper.metadata.migration.enable}} is true) > # Then we delete the KRaft controllers, delete the /controller znode > # Then we immediately start rolling the brokers 1 by 1 to ZK mode by > removing the {{zookeeper.metadata.migration.enable}} flag and the > controller.* configurations. > # At this point, when we restart the 1st broker (let's call it broker 0) > into ZK mode, we find an issue which occurs ~1 out of 20 times: > If broker 0 is not in the ISR for one of the partitions at this point, it can > simply never become part of the ISR. Since we are aiming for zero downtime, > we check the ISR states of partitions between broker restarts, and our > process gets blocked at this point. We have tried multiple workarounds at > this point, but it seems that there is no workaround which still ensures zero > downtime. > Some more details about the process: > * We are using Strimzi to drive this process, but have verified that Strimzi > follows the documented steps precisely. > * When we reach the error state, it doesn't matter which broker became the > controller through the ZK node, the brokers still in migration mode get > stuck, and they flood the logs with the following error: > {code:java} > 2025-02-26 10:55:21,985 WARN [RaftManager id=0] Error connecting to node > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local:9090 > (id: 5 rack: null) (org.apache.kafka.clients.NetworkClient) > [kafka-raft-outbound-request-thread] > java.net.UnknownHostException: > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local > at > java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801) > at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1533) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1385) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306) > at > org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:125) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.resolveAddresses(ClusterConnectionStates.java:536) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) > at > o
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
dejan2609 commented on code in PR #18018: URL: https://github.com/apache/kafka/pull/18018#discussion_r1977801099 ## build.gradle: ## @@ -1916,6 +1913,10 @@ project(':clients') { generator project(':generator') } + tasks.withType(GenerateModuleMetadata) { +enabled = false Review Comment: Git commits are rebased onto trunk; will leave this conversation opened (just in case that someone wants to do an additional review). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
mingdaoy commented on code in PR #19065: URL: https://github.com/apache/kafka/pull/19065#discussion_r1977722899 ## docs/ops.html: ## @@ -1343,7 +1343,7 @@ Check and wait until the Lag is small for a majority of the controllers. If the leader's end offset is not increasing, you can wait until the lag is 0 for a majority; otherwise, you can pick the latest leader end offset and wait until all replicas have reached it. Check and wait until the LastFetchTimestamp and LastCaughtUpTimestamp are close to each other for the majority of the controllers. At this point it is safer to format the controller's metadata log directory. This can be done by running the kafka-storage.sh command. - $ bin/kafka-storage.sh format --cluster-id uuid --config server_properties + $ bin/kafka-storage.sh format --cluster-id uuid --config config/server_properties Review Comment: L1352 as well ? `bin/kafka-server-start.sh server_properties` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-18913) Consider removing state-updater feature flag
Matthias J. Sax created KAFKA-18913: --- Summary: Consider removing state-updater feature flag Key: KAFKA-18913 URL: https://issues.apache.org/jira/browse/KAFKA-18913 Project: Kafka Issue Type: Task Components: streams Reporter: Matthias J. Sax Fix For: 4.1.0 We did enable the new StateUpdated thread with 3.8 release. We should consider removing the internal feature flag, and drop the old code. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17516: Synonyms for client metrics configs [kafka]
AndrewJSchofield commented on code in PR #17264: URL: https://github.com/apache/kafka/pull/17264#discussion_r1977096640 ## server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java: ## @@ -40,18 +40,18 @@ public class ClientMetricsTestUtils { public static final String DEFAULT_METRICS = "org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency"; -public static final int DEFAULT_PUSH_INTERVAL_MS = 30 * 1000; // 30 seconds -public static final List DEFAULT_CLIENT_MATCH_PATTERNS = List.of( +public static final int DEFAULT_INTERVAL_MS = 30 * 1000; // 30 seconds Review Comment: I've changed the naming but because these are not the true default, but the defaults for the test, I used `_TEST_DEFAULT` in the names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Revert "KAFKA-18067: Kafka Streams can leak Producer client under EOS (#17931)" [kafka]
cadonna opened a new pull request, #19078: URL: https://github.com/apache/kafka/pull/19078 This reverts commit e8837465a5fc478f1c79d1ad475b43e00a39a5d7. The commit that is reverted prevents Kafka Streams from re-initializing its transactional producer. If an exception that fences the transactional producer occurs, the producer is not re-initialized during the handling of the exception. That causes an infinite loop of ProducerFencedExceptions with corresponding rebalances. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker (#18997) [kafka]
FrankYang0529 commented on PR #19058: URL: https://github.com/apache/kafka/pull/19058#issuecomment-2693758227 @dajac, I think we can just cherry-picked it, because there is no conflict. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
dejan2609 commented on code in PR #18018: URL: https://github.com/apache/kafka/pull/18018#discussion_r1977235728 ## build.gradle: ## @@ -1916,6 +1913,10 @@ project(':clients') { generator project(':generator') } + tasks.withType(GenerateModuleMetadata) { +enabled = false Review Comment: Just to sum it up: @poom-kitti (and mine) intention is to have exact behavior for `kafka-clients` dependency (in external projects, that is) and hence I opted to honor his findings and to [disable gradle module metadata publication](https://docs.gradle.org/8.10.2/userguide/publishing_gradle_module_metadata.html#sub:disabling-gmm-publication) All in all: we ended up with a situation where we are trying to have all these three things at once: - switch shadow plugin (i.e. GradleUp shadow plugin) - have exactly the same dependency tree in external projects (with `kafka-clients` dependency) - publish gradle module metadata I dare to assume that this could be addressed on https://github.com/GradleUp/shadow side, maybe @Goooler can provide at least some insights. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18871) KRaft migration rollback causes downtime
[ https://issues.apache.org/jira/browse/KAFKA-18871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931922#comment-17931922 ] Luke Chen commented on KAFKA-18871: --- [~durban] , thanks for sharing the logs and thanks [~ppatierno] for doing the investigation for Strimzi side. I took a look at the kafka logs, and found 2 problems: # When the ISR update (i.e. AlterPartition request) encounters some errors, it is not retried. I've filed KAFKA-18911 for this issue. # What does the errors ISR update encountered and why does the error happened? I cannot tell from the information I have so far because unfortunately, [the error related logs|https://github.com/apache/kafka/blob/898dcd11ad260e9b3cfefc5291c40e68009acb7d/core/src/main/scala/kafka/cluster/Partition.scala#L1887-L1915] are all in "debug" level. So, even if we fix (1), this issue still occurs because until next metadata update, the ISR state will be in stale(wrong) state. To investigate the root cause of (2), [~durban] , when you're available, could you turn on DEBUG log level for these 2 lines and reproduce this issue? {code:java} log4j.logger.kafka=DEBUG log4j.logger.org.apache.kafka=DEBUG {code} Thank you. > KRaft migration rollback causes downtime > > > Key: KAFKA-18871 > URL: https://issues.apache.org/jira/browse/KAFKA-18871 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.9.0 >Reporter: Daniel Urban >Priority: Critical > Attachments: cluster-operator.log.zip, kraft-rollback-bug.zip > > > When testing the KRaft migration rollback feature, found the following > scenario: > # Execute KRaft migration on a 3 broker 3 ZK node cluster to the last step, > but do not finalize the migration. > ## In the test, we put a slow but continuous produce+consume load on the > cluster, with a topic (partitions=3, RF=3, min ISR=2) > # Start the rollback procedure > # First we roll back the brokers from KRaft mode to migration mode (both > controller and ZK configs are set, process roles are removed, > {{zookeeper.metadata.migration.enable}} is true) > # Then we delete the KRaft controllers, delete the /controller znode > # Then we immediately start rolling the brokers 1 by 1 to ZK mode by > removing the {{zookeeper.metadata.migration.enable}} flag and the > controller.* configurations. > # At this point, when we restart the 1st broker (let's call it broker 0) > into ZK mode, we find an issue which occurs ~1 out of 20 times: > If broker 0 is not in the ISR for one of the partitions at this point, it can > simply never become part of the ISR. Since we are aiming for zero downtime, > we check the ISR states of partitions between broker restarts, and our > process gets blocked at this point. We have tried multiple workarounds at > this point, but it seems that there is no workaround which still ensures zero > downtime. > Some more details about the process: > * We are using Strimzi to drive this process, but have verified that Strimzi > follows the documented steps precisely. > * When we reach the error state, it doesn't matter which broker became the > controller through the ZK node, the brokers still in migration mode get > stuck, and they flood the logs with the following error: > {code:java} > 2025-02-26 10:55:21,985 WARN [RaftManager id=0] Error connecting to node > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local:9090 > (id: 5 rack: null) (org.apache.kafka.clients.NetworkClient) > [kafka-raft-outbound-request-thread] > java.net.UnknownHostException: > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local > at > java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801) > at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1533) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1385) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306) > at > org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:125) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.resolveAddresses(ClusterConnectionStates.java:536) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466) > at > org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) > a
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
apoorvmittal10 commented on code in PR #18018: URL: https://github.com/apache/kafka/pull/18018#discussion_r1977242641 ## build.gradle: ## @@ -1916,6 +1913,10 @@ project(':clients') { generator project(':generator') } + tasks.withType(GenerateModuleMetadata) { +enabled = false Review Comment: Yeah, I agree it makes sense to do things step by step. This PR is an improvement over our current shadow plugin and fix for releasing signed libs. We should merge it and address the `module` metadata concern defined [here](https://github.com/apache/kafka/pull/18018#pullrequestreview-2638667585) separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18911) alterPartition gets stuck when getting out-of-date errors
[ https://issues.apache.org/jira/browse/KAFKA-18911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931923#comment-17931923 ] Luke Chen commented on KAFKA-18911: --- Had another look, it looks like we're OK to stay in this state until we see new metadata from LeaderAndIsr (or an update to the KRaft metadata log). Closing it now. > alterPartition gets stuck when getting out-of-date errors > - > > Key: KAFKA-18911 > URL: https://issues.apache.org/jira/browse/KAFKA-18911 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.9.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > When the leader node sends the AlterPartition request to the controller, the > controller will do [some > validation|https://github.com/apache/kafka/blob/898dcd11ad260e9b3cfefc5291c40e68009acb7d/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L1231] > before processing it. And in the leader node side, when receiving the > errors, we'll decide if it should be retried or not > [here|https://github.com/apache/kafka/blob/898dcd11ad260e9b3cfefc5291c40e68009acb7d/core/src/main/scala/kafka/cluster/Partition.scala#L1868]. > However, in some non-retry cases, we directly return false without changing > the state: > > {code:java} > case Errors.UNKNOWN_TOPIC_OR_PARTITION => > info(s"Failed to alter partition to $proposedIsrState since the controller > doesn't know about " + > "this topic or partition. Partition state may be out of sync, awaiting > new the latest metadata.") > false > case Errors.UNKNOWN_TOPIC_ID => > info(s"Failed to alter partition to $proposedIsrState since the controller > doesn't know about " + > "this topic. Partition state may be out of sync, awaiting new the latest > metadata.") > false > case Errors.FENCED_LEADER_EPOCH => > info(s"Failed to alter partition to $proposedIsrState since the leader > epoch is old. " + > "Partition state may be out of sync, awaiting new the latest metadata.") > false > case Errors.INVALID_UPDATE_VERSION => > info(s"Failed to alter partition to $proposedIsrState because the partition > epoch is invalid. " + > "Partition state may be out of sync, awaiting new the latest metadata.") > false > case Errors.INVALID_REQUEST => > info(s"Failed to alter partition to $proposedIsrState because the request > is invalid. " + > "Partition state may be out of sync, awaiting new the latest metadata.") > false > case Errors.NEW_LEADER_ELECTED => > // The operation completed successfully but this replica got removed from > the replica set by the controller > // while completing a ongoing reassignment. This replica is no longer the > leader but it does not know it > // yet. It should remain in the current pending state until the metadata > overrides it. > // This is only raised in KRaft mode. > info(s"The alter partition request successfully updated the partition state > to $proposedIsrState but " + > "this replica got removed from the replica set while completing a > reassignment. " + > "Waiting on new metadata to clean up this replica.") > false{code} > As we said in the log, "Partition state may be out of sync, awaiting new the > latest metadata". But without updating the partition state means it will > stays at `PendingExpandIsr` or `PendingShrinkIsr` state, which keeps the > `isInflight` to true. Under this state, the partition state will never be > updated anymore. > > The impact of this issue is that the ISR state will be in stale(wrong) state > until leadership change. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-18871) KRaft migration rollback causes downtime
[ https://issues.apache.org/jira/browse/KAFKA-18871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931936#comment-17931936 ] Daniel Urban commented on KAFKA-18871: -- [~showuon] Thanks for looking into it - will try to reproduce with debug logs, get back to you soon. > KRaft migration rollback causes downtime > > > Key: KAFKA-18871 > URL: https://issues.apache.org/jira/browse/KAFKA-18871 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.9.0 >Reporter: Daniel Urban >Priority: Critical > Attachments: cluster-operator.log.zip, kraft-rollback-bug.zip > > > When testing the KRaft migration rollback feature, found the following > scenario: > # Execute KRaft migration on a 3 broker 3 ZK node cluster to the last step, > but do not finalize the migration. > ## In the test, we put a slow but continuous produce+consume load on the > cluster, with a topic (partitions=3, RF=3, min ISR=2) > # Start the rollback procedure > # First we roll back the brokers from KRaft mode to migration mode (both > controller and ZK configs are set, process roles are removed, > {{zookeeper.metadata.migration.enable}} is true) > # Then we delete the KRaft controllers, delete the /controller znode > # Then we immediately start rolling the brokers 1 by 1 to ZK mode by > removing the {{zookeeper.metadata.migration.enable}} flag and the > controller.* configurations. > # At this point, when we restart the 1st broker (let's call it broker 0) > into ZK mode, we find an issue which occurs ~1 out of 20 times: > If broker 0 is not in the ISR for one of the partitions at this point, it can > simply never become part of the ISR. Since we are aiming for zero downtime, > we check the ISR states of partitions between broker restarts, and our > process gets blocked at this point. We have tried multiple workarounds at > this point, but it seems that there is no workaround which still ensures zero > downtime. > Some more details about the process: > * We are using Strimzi to drive this process, but have verified that Strimzi > follows the documented steps precisely. > * When we reach the error state, it doesn't matter which broker became the > controller through the ZK node, the brokers still in migration mode get > stuck, and they flood the logs with the following error: > {code:java} > 2025-02-26 10:55:21,985 WARN [RaftManager id=0] Error connecting to node > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local:9090 > (id: 5 rack: null) (org.apache.kafka.clients.NetworkClient) > [kafka-raft-outbound-request-thread] > java.net.UnknownHostException: > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local > at > java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801) > at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1533) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1385) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306) > at > org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:125) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.resolveAddresses(ClusterConnectionStates.java:536) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466) > at > org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1075) > at > org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:321) > at > org.apache.kafka.server.util.InterBrokerSendThread.sendRequests(InterBrokerSendThread.java:146) > at > org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:109) > at > org.apache.kafka.server.util.InterBrokerSendThread.doWork(InterBrokerSendThread.java:137) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136) > {code} > * Manually verified the last offsets of the replicas, and broker 0 is caught > up in the partition. > * Even after stopping the produce load, the issue persists. > * Even after removing the /controller node manually (to retrigger election), > regardless of which broker becomes the controller, the issue persis
[jira] [Commented] (KAFKA-18871) KRaft migration rollback causes downtime
[ https://issues.apache.org/jira/browse/KAFKA-18871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931925#comment-17931925 ] Luke Chen commented on KAFKA-18871: --- Had another look for (1), it is not a problem now since if the new metadata is updated, the state will be refreshed. So we should focus on (2). Thanks. > KRaft migration rollback causes downtime > > > Key: KAFKA-18871 > URL: https://issues.apache.org/jira/browse/KAFKA-18871 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.9.0 >Reporter: Daniel Urban >Priority: Critical > Attachments: cluster-operator.log.zip, kraft-rollback-bug.zip > > > When testing the KRaft migration rollback feature, found the following > scenario: > # Execute KRaft migration on a 3 broker 3 ZK node cluster to the last step, > but do not finalize the migration. > ## In the test, we put a slow but continuous produce+consume load on the > cluster, with a topic (partitions=3, RF=3, min ISR=2) > # Start the rollback procedure > # First we roll back the brokers from KRaft mode to migration mode (both > controller and ZK configs are set, process roles are removed, > {{zookeeper.metadata.migration.enable}} is true) > # Then we delete the KRaft controllers, delete the /controller znode > # Then we immediately start rolling the brokers 1 by 1 to ZK mode by > removing the {{zookeeper.metadata.migration.enable}} flag and the > controller.* configurations. > # At this point, when we restart the 1st broker (let's call it broker 0) > into ZK mode, we find an issue which occurs ~1 out of 20 times: > If broker 0 is not in the ISR for one of the partitions at this point, it can > simply never become part of the ISR. Since we are aiming for zero downtime, > we check the ISR states of partitions between broker restarts, and our > process gets blocked at this point. We have tried multiple workarounds at > this point, but it seems that there is no workaround which still ensures zero > downtime. > Some more details about the process: > * We are using Strimzi to drive this process, but have verified that Strimzi > follows the documented steps precisely. > * When we reach the error state, it doesn't matter which broker became the > controller through the ZK node, the brokers still in migration mode get > stuck, and they flood the logs with the following error: > {code:java} > 2025-02-26 10:55:21,985 WARN [RaftManager id=0] Error connecting to node > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local:9090 > (id: 5 rack: null) (org.apache.kafka.clients.NetworkClient) > [kafka-raft-outbound-request-thread] > java.net.UnknownHostException: > kraft-rollback-kafka-controller-pool-5.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-e7798bef.svc.cluster.local > at > java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801) > at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1533) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1385) > at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306) > at > org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:125) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.resolveAddresses(ClusterConnectionStates.java:536) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466) > at > org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1075) > at > org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:321) > at > org.apache.kafka.server.util.InterBrokerSendThread.sendRequests(InterBrokerSendThread.java:146) > at > org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:109) > at > org.apache.kafka.server.util.InterBrokerSendThread.doWork(InterBrokerSendThread.java:137) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136) > {code} > * Manually verified the last offsets of the replicas, and broker 0 is caught > up in the partition. > * Even after stopping the produce load, the issue persists. > * Even after removing the /controller node manually (to retrigger election), > regardless of which br
[jira] [Reopened] (KAFKA-18067) Kafka Streams can leak Producer client under EOS
[ https://issues.apache.org/jira/browse/KAFKA-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reopened KAFKA-18067: --- We had to revert the fix for this bug (https://github.com/apache/kafka/pull/19078) because it introduces a blocking bug for AK 4.0. The issue is that the fix prevented Kafka Streams from re-initializing its transactional producer under exactly-once semantics. That led to an infinite loop of {{ProducerFencedException}}s with corresponding rebalances. For example: # 1 A network partitions happens that causes the timeout of a transaction. # 2 The transactional producer is fenced due to invalid producer epoch. # 3 Kafka Streams closes the tasks dirty and re-joins the group, i.e., a rebalance is triggered. # 4 The transactional producer is NOT re-initialized and does NOT get a new producer epoch. # 5 Processing starts but the transactional producer is immediately fenced during the attempt to start a new transaction because of the invalid producer epoch. # step 3 is repeated > Kafka Streams can leak Producer client under EOS > > > Key: KAFKA-18067 > URL: https://issues.apache.org/jira/browse/KAFKA-18067 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: TengYao Chi >Priority: Major > Labels: newbie, newbie++ > Fix For: 4.0.0 > > > Under certain conditions Kafka Streams can end up closing a producer client > twice and creating a new one that then is never closed. > During a StreamThread's shutdown, the TaskManager is closed first, through > which the thread's producer client is also closed. Later on we call > #unsubscribe on the main consumer, which can result in the #onPartitionsLost > callback being invoked and ultimately trying to reset/reinitialize the > StreamsProducer if EOS is enabled. This in turn includes closing the current > producer and creating a new one. And since the current producer was already > closed, we end up closing that client twice and never closing the newly > created producer. > Ideally we would just skip the reset/reinitialize process entirely when > invoked during shutdown. This solves the two problems here (leaked client and > double close), while also removing the unnecessary overhead of creating an > entirely new client just to throw it away -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18067) Kafka Streams can leak Producer client under EOS
[ https://issues.apache.org/jira/browse/KAFKA-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-18067: -- Fix Version/s: (was: 4.0.0) > Kafka Streams can leak Producer client under EOS > > > Key: KAFKA-18067 > URL: https://issues.apache.org/jira/browse/KAFKA-18067 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: TengYao Chi >Priority: Major > Labels: newbie, newbie++ > > Under certain conditions Kafka Streams can end up closing a producer client > twice and creating a new one that then is never closed. > During a StreamThread's shutdown, the TaskManager is closed first, through > which the thread's producer client is also closed. Later on we call > #unsubscribe on the main consumer, which can result in the #onPartitionsLost > callback being invoked and ultimately trying to reset/reinitialize the > StreamsProducer if EOS is enabled. This in turn includes closing the current > producer and creating a new one. And since the current producer was already > closed, we end up closing that client twice and never closing the newly > created producer. > Ideally we would just skip the reset/reinitialize process entirely when > invoked during shutdown. This solves the two problems here (leaked client and > double close), while also removing the unnecessary overhead of creating an > entirely new client just to throw it away -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18843: Fix MirrorMaker2 workerId is not unique, but use the sam… [kafka]
k0b3rIT commented on PR #18994: URL: https://github.com/apache/kafka/pull/18994#issuecomment-2693628006 @viktorsomogyi @C0urante Can you please kindly check this fix when you have free capacity. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
dejan2609 commented on code in PR #18018: URL: https://github.com/apache/kafka/pull/18018#discussion_r1977114543 ## build.gradle: ## @@ -353,17 +353,14 @@ subprojects { if (!shouldPublishWithShadow) { from components.java } else { -apply plugin: 'io.github.goooler.shadow' -project.shadow.component(mavenJava) +apply plugin: 'com.gradleup.shadow' +from components.shadow // Fix for avoiding inclusion of runtime dependencies marked as 'shadow' in MANIFEST Class-Path. -// https://github.com/johnrengelman/shadow/issues/324 +// https://github.com/GradleUp/shadow/issues/324 afterEvaluate { Review Comment: @Goooler I have this on my mind and will (probably) address in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker (#18997) [kafka]
dajac commented on PR #19058: URL: https://github.com/apache/kafka/pull/19058#issuecomment-2693750609 @FrankYang0529 Do we also need a PR for 4.0 or was it already cherry-picked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10864: Convert end txn marker schema to use auto-generated protocol [kafka]
dengziming commented on code in PR #9766: URL: https://github.com/apache/kafka/pull/9766#discussion_r1977186279 ## clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java: ## @@ -95,32 +76,35 @@ public int hashCode() { private static void ensureTransactionMarkerControlType(ControlRecordType type) { if (type != ControlRecordType.COMMIT && type != ControlRecordType.ABORT) -throw new IllegalArgumentException("Invalid control record type for end transaction marker" + type); +throw new IllegalArgumentException("Invalid control record type for end transaction marker " + type); } public static EndTransactionMarker deserialize(Record record) { ControlRecordType type = ControlRecordType.parse(record.key()); return deserializeValue(type, record.value()); } +// Visible for testing static EndTransactionMarker deserializeValue(ControlRecordType type, ByteBuffer value) { ensureTransactionMarkerControlType(type); -if (value.remaining() < CURRENT_END_TXN_MARKER_VALUE_SIZE) -throw new InvalidRecordException("Invalid value size found for end transaction marker. Must have " + -"at least " + CURRENT_END_TXN_MARKER_VALUE_SIZE + " bytes, but found only " + value.remaining()); - -short version = value.getShort(0); -if (version < 0) +short version = value.getShort(); +if (version < EndTxnMarker.LOWEST_SUPPORTED_VERSION) throw new InvalidRecordException("Invalid version found for end transaction marker: " + version + ". May indicate data corruption"); -if (version > CURRENT_END_TXN_MARKER_VERSION) +if (version > EndTxnMarker.HIGHEST_SUPPORTED_VERSION) log.debug("Received end transaction marker value version {}. Parsing as version {}", version, -CURRENT_END_TXN_MARKER_VERSION); +EndTxnMarker.HIGHEST_SUPPORTED_VERSION); +EndTxnMarker marker = new EndTxnMarker(new ByteBufferAccessor(value), version); +return new EndTransactionMarker(type, marker.coordinatorEpoch()); +} -int coordinatorEpoch = value.getInt(2); -return new EndTransactionMarker(type, coordinatorEpoch); +public int endTxnMarkerValueSize() { Review Comment: Yes, it's good to test it since it's public. ## clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java: ## @@ -17,16 +17,30 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.message.EndTxnMarker; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class EndTransactionMarkerTest { +// Old hard-coded schema, used to validate old hard-coded schema format is exactly the same as new auto generated protocol format +private Schema v0Schema = new Schema( Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Extract HeartbeatRequestState from heartbeat request managers [kafka]
cadonna merged PR #19043: URL: https://github.com/apache/kafka/pull/19043 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: Move UnifiedLog to storage module [kafka]
mimaison commented on code in PR #19030: URL: https://github.com/apache/kafka/pull/19030#discussion_r1977276265 ## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ## @@ -16,44 +16,2351 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InconsistentTopicIdException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeProducersResponseData; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.PrimitiveRef; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.common.RequestLocal; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.UnexpectedAppendOffsetException; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; -public class UnifiedLog { +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * A log which presents a unified view of local and tiered log segments. + * + * The log consists of tiered and local segments with the tiered portion of the log being optional. There could be an + * overlap between the tiered and local segments. The active segment is always guaranteed to be local. If tiered segments + * are present, they always appear at the beginning of the log, followed by an optional region of overlap, followed by the local + * segments including the active segment. + * + * NOTE: this class handles state and behavior specific to tiered segments as well as any behavior combining both tiered + * and local segments. The state and behavior specific to local segments are handled by the encapsulated LocalLog instance. + */ +public class UnifiedLog implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(UnifiedLog.class); -public static final String LOG_FILE_SUFFIX = LogFileUtils.LOG_FILE_SUFFIX; -public static final String INDEX_FILE_SUFFIX = LogFileUtils.INDEX_FILE_SUFFIX; -public static final String TIME_INDEX_FILE_SUFFIX = LogFileUtils.TIME_INDEX_FILE_SUFFIX; -public static final String TXN_INDEX_FILE_SUFFIX = Lo
[jira] [Commented] (KAFKA-17808) InstanceAlreadyExistsException: kafka.admin.client:type=app-info,id=connector-dlq-adminclient- when add connector with tasks
[ https://issues.apache.org/jira/browse/KAFKA-17808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931929#comment-17931929 ] Muralidhar Basani commented on KAFKA-17808: --- [~yung] thanks for asking. It would be great if you can take over. Currently stuck with other things. > InstanceAlreadyExistsException: > kafka.admin.client:type=app-info,id=connector-dlq-adminclient- when add > connector with tasks > > > Key: KAFKA-17808 > URL: https://issues.apache.org/jira/browse/KAFKA-17808 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: ignat233 >Assignee: Greg Harris >Priority: Major > Attachments: image-2024-10-16-13-00-36-667.png > > > Why do we always create an admin client with the same > "connector-dlq-adminclient-" value id? > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1008] > For all other cases, a postfix is added. > !image-2024-10-16-13-00-36-667.png! > I get "Error registering AppInfo mbean > javax.management.InstanceAlreadyExistsException: > kafka.admin.client:type=app-info,id=connector-dlq-adminclient-." error for > all tasks. > It looks like the ConnectorTaskId should be added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17808 : Append connector name to admin client id - dlq sink reporter [kafka]
muralibasani commented on PR #17538: URL: https://github.com/apache/kafka/pull/17538#issuecomment-2693947539 > @muralibasani Thanks for the patch. Do you have time to revisit this? Otherwise, I'd like to continue this PR and add some tests. @Yunyung thank you and pls take over. It would be great. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Revert "KAFKA-18067: Kafka Streams can leak Producer client under EOS (#17931)" [kafka]
cadonna merged PR #19078: URL: https://github.com/apache/kafka/pull/19078 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18904) Listing of configs for dynamically created resources is mysterious
[ https://issues.apache.org/jira/browse/KAFKA-18904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931909#comment-17931909 ] Andrew Schofield commented on KAFKA-18904: -- [~yangpoan]Absolutely, please go ahead. > Listing of configs for dynamically created resources is mysterious > -- > > Key: KAFKA-18904 > URL: https://issues.apache.org/jira/browse/KAFKA-18904 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Schofield >Assignee: PoAn Yang >Priority: Major > Labels: needs-kip > > The `kafka-configs.sh` tool can be used to set configurations on dynamically > created resources such as groups and client metrics. However, the way that > listing of the configs works is unhelpful. > bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --group G1 > --add-config consumer.heartbeat.interval.ms=1 > * This defines the config consumer.heartbeat.interval.ms > bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe > --entity-type groups > * This only describes the configs of groups that actually exist, as will > happen when the group actually has started being used. > bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe > --entity-type groups --entity-name G1 > * This actually displays the configs for G1. > The problem is that using `--describe` with no entity name, the tool lists > the resources (the groups) not the configs. As a result, if you define > configs in preparation for the use of groups in the future, you need to > remember what you created. You cannot list the groups for which configs are > defined, only the groups which actually exist from the point of view of the > group coordinator. > Client metrics are a bit better because there is at least an RPC for listing > the client metrics resources. > There is a second class of related problem. > bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe > --entity-type groups --entity-name DOESNOTEXIST > * This does not return an error for a non-existent resource. > bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe > --entity-type client-metrics --entity-name DOESNOTEXIST > * This does not return an error for a non-existent resource. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18887 Implement Streams Admin APIs [kafka]
lucasbru commented on code in PR #19049: URL: https://github.com/apache/kafka/pull/19049#discussion_r1977177176 ## clients/src/main/java/org/apache/kafka/clients/admin/DeleteStreamsGroupOffsetsResult.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.protocol.Errors; + +import java.util.Map; +import java.util.Set; + +/** + * The result of the {@link Admin#deleteConsumerGroupOffsets(String, Set)} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DeleteStreamsGroupOffsetsResult extends DeleteConsumerGroupOffsetsResult { Review Comment: I don't think we should use inheritance here. There is no subtyping relationship here really. Also, technically this is a change to the public API that needs to be changed in the KIP. I understand you want to avoid reimplementing the class. Have you considered using delegation? That is, the (package-protected) contructor for this class takes a DeleteConsumerGroupOffsetsResult and delegates calls to partitionResult and all to it? ## clients/src/main/java/org/apache/kafka/clients/admin/AlterStreamsGroupOffsetsResult.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.protocol.Errors; + +import java.util.Map; + +/** + * The result of the {@link AdminClient#alterConsumerGroupOffsets(String, Map)} call. Review Comment: This is the result of alterStreamsGroupOffsets ## clients/src/main/java/org/apache/kafka/clients/admin/Admin.java: ## @@ -945,6 +945,29 @@ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Maphttp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * Options for the {@link AdminClient#alterStreamsGroupOffsets(String, Map, AlterStreamsGroupOffsetsOptions)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class AlterStreamsGroupOffsetsOptions extends AlterConsumerGroupOffsetsOptions { Review Comment: I would directly derive from `AbstractOptions` here. Similar for the other `option` classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
dejan2609 commented on code in PR #18018: URL: https://github.com/apache/kafka/pull/18018#discussion_r1977193479 ## build.gradle: ## @@ -1916,6 +1913,10 @@ project(':clients') { generator project(':generator') } + tasks.withType(GenerateModuleMetadata) { +enabled = false Review Comment: @apoorvmittal10 It started with this comment made by @poom-kitti here: https://github.com/apache/kafka/pull/18018#issuecomment-2637629283 Let me digest what is going on here just by using `kafka-clients` as an external projects dependency. Scenario: - create a new project **_someNewProject_** and use `kafka-clients` as dependency - from `kafka` project: create three `kafka-clients` versions (using three git commits, see below) and publish them into local maven repository - execute `./gradlew dependencies` against **_someNewProject_** (while rotating `kafka-clients` versions created in a previous step) Results for **_someNewProject_** `./gradlew dependencies` command (and three different `kafka-clients` versions): 1. Kafka trunk (with Gradle module metadata enabled by a default): ``` runtimeClasspath - Runtime classpath of source set 'main'. \--- org.apache.kafka:kafka-clients:4.1.0-SNAPSHOT-test-no-gradle-module +--- com.github.luben:zstd-jni:1.5.6-6 +--- org.lz4:lz4-java:1.8.0 +--- org.xerial.snappy:snappy-java:1.1.10.5 \--- org.slf4j:slf4j-api:1.7.36 ``` 2. this PR HEAD~1 commit (with a new shadow plugin **AND** with Gradle module metadata enabled by default): ``` runtimeClasspath - Runtime classpath of source set 'main'. \--- org.apache.kafka:kafka-clients:4.1.0-SNAPSHOT-test ``` 3. this PR HEAD comit (with new shadow plugin AND with Gradle module metadata explicitly disabled): ``` runtimeClasspath - Runtime classpath of source set 'main'. \--- org.apache.kafka:kafka-clients:4.1.0-SNAPSHOT-test-no-gradle-module +--- com.github.luben:zstd-jni:1.5.6-6 +--- org.lz4:lz4-java:1.8.0 +--- org.xerial.snappy:snappy-java:1.1.10.5 \--- org.slf4j:slf4j-api:1.7.36 ``` (see my comment https://github.com/apache/kafka/pull/18018#issuecomment-2645879873 for more details about testing) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18817:[1/N] ShareGroupHeartbeat and ShareGroupDescribe API must check topic describe [kafka]
AndrewJSchofield merged PR #19055: URL: https://github.com/apache/kafka/pull/19055 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disallow unused local variables [kafka]
lucasbru commented on code in PR #18963: URL: https://github.com/apache/kafka/pull/18963#discussion_r1977199932 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -397,7 +397,7 @@ public void testStressfulSituation() throws Exception { List batches = accum.drain(metadataCache, nodes, 5 * 1024, 0).get(node1.id()); if (batches != null) { for (ProducerBatch batch : batches) { -for (Record record : batch.records().records()) +for (@SuppressWarnings("UnusedLocalVariable") Record ignored : batch.records().records()) read++; Review Comment: I think the point of the test is actually simulating consumption of the batch in a scenario where multiple threads append to the accumulator. So we want to actually access the `MemoryRecords`, not just test the record counter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-18911) alterPartition gets stuck when getting out-of-date errors
Luke Chen created KAFKA-18911: - Summary: alterPartition gets stuck when getting out-of-date errors Key: KAFKA-18911 URL: https://issues.apache.org/jira/browse/KAFKA-18911 Project: Kafka Issue Type: Bug Affects Versions: 3.9.0 Reporter: Luke Chen Assignee: Luke Chen When the leader node sends the AlterPartition request to the controller, the controller will do [some validation|https://github.com/apache/kafka/blob/898dcd11ad260e9b3cfefc5291c40e68009acb7d/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L1231] before processing it. And in the leader node side, when receiving the errors, we'll decide if it should be retried or not [here|https://github.com/apache/kafka/blob/898dcd11ad260e9b3cfefc5291c40e68009acb7d/core/src/main/scala/kafka/cluster/Partition.scala#L1868]. However, in some non-retry cases, we directly return false without changing the state: {code:java} case Errors.UNKNOWN_TOPIC_OR_PARTITION => info(s"Failed to alter partition to $proposedIsrState since the controller doesn't know about " + "this topic or partition. Partition state may be out of sync, awaiting new the latest metadata.") false case Errors.UNKNOWN_TOPIC_ID => info(s"Failed to alter partition to $proposedIsrState since the controller doesn't know about " + "this topic. Partition state may be out of sync, awaiting new the latest metadata.") false case Errors.FENCED_LEADER_EPOCH => info(s"Failed to alter partition to $proposedIsrState since the leader epoch is old. " + "Partition state may be out of sync, awaiting new the latest metadata.") false case Errors.INVALID_UPDATE_VERSION => info(s"Failed to alter partition to $proposedIsrState because the partition epoch is invalid. " + "Partition state may be out of sync, awaiting new the latest metadata.") false case Errors.INVALID_REQUEST => info(s"Failed to alter partition to $proposedIsrState because the request is invalid. " + "Partition state may be out of sync, awaiting new the latest metadata.") false case Errors.NEW_LEADER_ELECTED => // The operation completed successfully but this replica got removed from the replica set by the controller // while completing a ongoing reassignment. This replica is no longer the leader but it does not know it // yet. It should remain in the current pending state until the metadata overrides it. // This is only raised in KRaft mode. info(s"The alter partition request successfully updated the partition state to $proposedIsrState but " + "this replica got removed from the replica set while completing a reassignment. " + "Waiting on new metadata to clean up this replica.") false{code} As we said in the log, "Partition state may be out of sync, awaiting new the latest metadata". But without updating the partition state means it will stays at `PendingExpandIsr` or `PendingShrinkIsr` state, which keeps the `isInflight` to true. Under this state, the partition state will never be updated anymore. The impact of this issue is that the ISR state will be in stale(wrong) state until leadership change. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-18911) alterPartition gets stuck when getting out-of-date errors
[ https://issues.apache.org/jira/browse/KAFKA-18911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-18911. --- Resolution: Invalid > alterPartition gets stuck when getting out-of-date errors > - > > Key: KAFKA-18911 > URL: https://issues.apache.org/jira/browse/KAFKA-18911 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.9.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > When the leader node sends the AlterPartition request to the controller, the > controller will do [some > validation|https://github.com/apache/kafka/blob/898dcd11ad260e9b3cfefc5291c40e68009acb7d/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L1231] > before processing it. And in the leader node side, when receiving the > errors, we'll decide if it should be retried or not > [here|https://github.com/apache/kafka/blob/898dcd11ad260e9b3cfefc5291c40e68009acb7d/core/src/main/scala/kafka/cluster/Partition.scala#L1868]. > However, in some non-retry cases, we directly return false without changing > the state: > > {code:java} > case Errors.UNKNOWN_TOPIC_OR_PARTITION => > info(s"Failed to alter partition to $proposedIsrState since the controller > doesn't know about " + > "this topic or partition. Partition state may be out of sync, awaiting > new the latest metadata.") > false > case Errors.UNKNOWN_TOPIC_ID => > info(s"Failed to alter partition to $proposedIsrState since the controller > doesn't know about " + > "this topic. Partition state may be out of sync, awaiting new the latest > metadata.") > false > case Errors.FENCED_LEADER_EPOCH => > info(s"Failed to alter partition to $proposedIsrState since the leader > epoch is old. " + > "Partition state may be out of sync, awaiting new the latest metadata.") > false > case Errors.INVALID_UPDATE_VERSION => > info(s"Failed to alter partition to $proposedIsrState because the partition > epoch is invalid. " + > "Partition state may be out of sync, awaiting new the latest metadata.") > false > case Errors.INVALID_REQUEST => > info(s"Failed to alter partition to $proposedIsrState because the request > is invalid. " + > "Partition state may be out of sync, awaiting new the latest metadata.") > false > case Errors.NEW_LEADER_ELECTED => > // The operation completed successfully but this replica got removed from > the replica set by the controller > // while completing a ongoing reassignment. This replica is no longer the > leader but it does not know it > // yet. It should remain in the current pending state until the metadata > overrides it. > // This is only raised in KRaft mode. > info(s"The alter partition request successfully updated the partition state > to $proposedIsrState but " + > "this replica got removed from the replica set while completing a > reassignment. " + > "Waiting on new metadata to clean up this replica.") > false{code} > As we said in the log, "Partition state may be out of sync, awaiting new the > latest metadata". But without updating the partition state means it will > stays at `PendingExpandIsr` or `PendingShrinkIsr` state, which keeps the > `isInflight` to true. Under this state, the partition state will never be > updated anymore. > > The impact of this issue is that the ISR state will be in stale(wrong) state > until leadership change. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16936) Upgrade slf4k to 2.0.9 and integrate "-Dslf4j.provider" to kafka script
[ https://issues.apache.org/jira/browse/KAFKA-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931926#comment-17931926 ] Muralidhar Basani commented on KAFKA-16936: --- Hi [~frankvicky] sorry, could not respond earlier. Thank you for taking over. > Upgrade slf4k to 2.0.9 and integrate "-Dslf4j.provider" to kafka script > --- > > Key: KAFKA-16936 > URL: https://issues.apache.org/jira/browse/KAFKA-16936 > Project: Kafka > Issue Type: New Feature >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Major > Labels: need-kip > > origin discussion: > [https://github.com/apache/kafka/pull/16260#issuecomment-2159632052] > The specific provider class can be defined by `slf4j.provider`[0]. Hence, we > can add the slf4j backends we care about to dependencies. With that, our > distributions will have different slf4j backends and it is safe as we will > define slf4j.provider in our script. Also, those slf4j backends will be > collected to "dependend-libs", and hence we can run kafka instance from > source code with specific provider too. > In short, the following tasks are included by this jira > 1. upgrade slf4j from 1.7.36 to 2.0.9+ > 2. add a new system variable to script to define -Dslf4j.provider easily. By > default we use org.slf4j.reload4j.Reload4jServiceProvider > 3. add other slf4j backend dependencies (optional) > This change needs KIP since slf4j requires the version match between the > provider and slf4j-api.jar. Hence, users may encounter compatibility issue if > they have added other providers jar into kafka classpath. > [0] https://www.slf4j.org/manual.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
apoorvmittal10 commented on PR #18018: URL: https://github.com/apache/kafka/pull/18018#issuecomment-2693906211 @dejan2609 Can you please merge the upstream changes and re-run the build to see if tests pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: Move UnifiedLog to storage module [kafka]
mimaison commented on code in PR #19030: URL: https://github.com/apache/kafka/pull/19030#discussion_r1977352466 ## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ## @@ -16,44 +16,2351 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InconsistentTopicIdException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeProducersResponseData; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.PrimitiveRef; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.common.RequestLocal; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.UnexpectedAppendOffsetException; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; -public class UnifiedLog { +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * A log which presents a unified view of local and tiered log segments. + * + * The log consists of tiered and local segments with the tiered portion of the log being optional. There could be an + * overlap between the tiered and local segments. The active segment is always guaranteed to be local. If tiered segments + * are present, they always appear at the beginning of the log, followed by an optional region of overlap, followed by the local + * segments including the active segment. + * + * NOTE: this class handles state and behavior specific to tiered segments as well as any behavior combining both tiered + * and local segments. The state and behavior specific to local segments are handled by the encapsulated LocalLog instance. + */ +public class UnifiedLog implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(UnifiedLog.class); -public static final String LOG_FILE_SUFFIX = LogFileUtils.LOG_FILE_SUFFIX; -public static final String INDEX_FILE_SUFFIX = LogFileUtils.INDEX_FILE_SUFFIX; -public static final String TIME_INDEX_FILE_SUFFIX = LogFileUtils.TIME_INDEX_FILE_SUFFIX; -public static final String TXN_INDEX_FILE_SUFFIX = Lo
Re: [PR] KAFKA-18899: Limit retry time for ShareConsumer.commitAsync [kafka]
AndrewJSchofield commented on PR #19060: URL: https://github.com/apache/kafka/pull/19060#issuecomment-2693753771 Converting to draft PR. Additional testing revealed that the time-out logic needs to be a bit more sophisticated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18142 switch Kafka's Gradle build shadow plugin to `com.gradleup.shadow` (and upgrade plugin version) [kafka]
dejan2609 commented on PR #18018: URL: https://github.com/apache/kafka/pull/18018#issuecomment-2694058225 > @dejan2609 Can you please merge the upstream changes and re-run the build to see if tests pass. @apoorvmittal10 Done, PR is rebased onto trunk. I opted to keep all commits separated just for the sake of clarity if someone else wants to do a review (but will easily squash them into one when the time comes). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18477) remove usage of OffsetForLeaderEpochRequest in AbstractFetcherThread
[ https://issues.apache.org/jira/browse/KAFKA-18477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931915#comment-17931915 ] Chia-Ping Tsai commented on KAFKA-18477: {quote} It's unlikely that any topic would only have V0/V1 records since any writes with IBP >= 3.0 would result in a V2 record being written. {quote} I understand it's an edge case, but a topic with only v0/v1 records is still valid in 4.x, right? {quote} latestEpoch(tp) should always be available since we only have V2 message format in 4.0 {quote} Regardless of record version, the leaderEpochCache is updated when appending records. Therefore, the follower fetcher doesn't know the latestEpoch when starting the fetch thread. However, this looks like a no-op because it truncates the offset to 0. {code:java} [2025-03-03 09:21:43,792] INFO [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Truncating partition 33opic-0 with TruncationState(offset=0, completed=true) due to local high watermark 0 (kafka.server.ReplicaFetcherThread) [2025-03-03 09:21:43,792] INFO [UnifiedLog partition=33opic-0, dir=/tmp/kafka-logs] Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.UnifiedLog) {code} > remove usage of OffsetForLeaderEpochRequest in AbstractFetcherThread > > > Key: KAFKA-18477 > URL: https://issues.apache.org/jira/browse/KAFKA-18477 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 4.0.0 >Reporter: Jun Rao >Assignee: 黃竣陽 >Priority: Major > Fix For: 4.1.0 > > > This is because of the base MV in 4.0 is 3.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [MINOR] Clean up metadata module [kafka]
sjhajharia commented on PR #19069: URL: https://github.com/apache/kafka/pull/19069#issuecomment-2694200768 Thanks @m1a2st for catching those classes. Updated the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17516: Synonyms for client metrics configs [kafka]
chia7712 commented on PR #17264: URL: https://github.com/apache/kafka/pull/17264#issuecomment-2694338122 @AndrewJSchofield could you please check the build error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18912) Failed test MetadataSchemaCheckerToolTest.testVerifyEvolutionGit on Trunk
[ https://issues.apache.org/jira/browse/KAFKA-18912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931965#comment-17931965 ] Chia-Ping Tsai commented on KAFKA-18912: It seem the KAFKA-4650 is related to streams module, so you can run the tests of streams module only. For example: ./gradlew clean streams:test > Failed test MetadataSchemaCheckerToolTest.testVerifyEvolutionGit on Trunk > - > > Key: KAFKA-18912 > URL: https://issues.apache.org/jira/browse/KAFKA-18912 > Project: Kafka > Issue Type: Bug >Reporter: Lorcan >Priority: Minor > > I am unable to build (up to date as of 03/03) trunk with no local changes due > to a failed test: MetadataSchemaCheckerToolTest.testVerifyEvolutionGit. > Build error message shown below: > {code:java} > java.lang.IllegalStateException: Did not find expected file > metadata/src/main/resources/common/metadata/AbortTransactionRecord.json > at > org.apache.kafka.message.checker.CheckerUtils.getDataFromGit(CheckerUtils.java:146) > at > org.apache.kafka.message.checker.MetadataSchemaCheckerTool.run(MetadataSchemaCheckerTool.java:107) > at > org.apache.kafka.message.checker.MetadataSchemaCheckerToolTest.testVerifyEvolutionGit(MetadataSchemaCheckerToolTest.java:32) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-13022: Optimize ClientQuotasImage#describe (wip) [kafka]
FrankYang0529 opened a new pull request, #19079: URL: https://github.com/apache/kafka/pull/19079 Delete this text and replace it with a detailed description of your change. The PR title and body will become the squashed commit message. If you would like to tag individuals, add some commentary, upload images, or include other supplemental information that should not be part of the eventual commit message, please use a separate comment. If applicable, please include a summary of the testing strategy (including rationale) for the proposed change. Unit and/or integration tests are expected for any behavior change and system tests should be considered for larger changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: Move UnifiedLog to storage module [kafka]
mimaison commented on PR #19030: URL: https://github.com/apache/kafka/pull/19030#issuecomment-2694168281 Thanks @chia7712 for the review! I pushed an update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18804: Remove slf4j warning when using tool script [kafka]
m1a2st commented on code in PR #18918: URL: https://github.com/apache/kafka/pull/18918#discussion_r1977444283 ## build.gradle: ## @@ -1075,7 +1075,6 @@ project(':core') { from (configurations.runtimeClasspath) { exclude('kafka-clients*') } -from (configurations.releaseOnly) Review Comment: Use `./gradlew clean build -x test`, and also work as expected. ``` ./bin/kafka-storage.sh -h usage: kafka-storage [-h] {info,format,version-mapping,feature-dependencies,random-uuid} ... ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
m1a2st commented on code in PR #19065: URL: https://github.com/apache/kafka/pull/19065#discussion_r1977515801 ## docs/ops.html: ## @@ -4187,7 +4187,7 @@ org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor interface and specifying the full class name in the configuration. Review Comment: > Unless consistent font size is the top priority, keeping the entire package and class name on the same line makes it easier to copy, search and read. I don't think it's normal for the font size to vary when reading a document. It seriously affects the reading experience. > Adding a line break within the class name could make it unsearchable and cause hidden formatting issues in copy-pasting Can you avoid using line breaks to split the code inside the code block and only add them before and after the code block instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18912) Failed test MetadataSchemaCheckerToolTest.testVerifyEvolutionGit on Trunk
[ https://issues.apache.org/jira/browse/KAFKA-18912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931961#comment-17931961 ] Lorcan commented on KAFKA-18912: Hi [~chia7712], thanks for the information, I wasn't aware of this. I think this might also block me as I have a PR against trunk that I'm working on for a [testing ticket|https://issues.apache.org/jira/browse/KAFKA-4650]. But I'm new to the project and still trying to understand the different pieces. > Failed test MetadataSchemaCheckerToolTest.testVerifyEvolutionGit on Trunk > - > > Key: KAFKA-18912 > URL: https://issues.apache.org/jira/browse/KAFKA-18912 > Project: Kafka > Issue Type: Bug >Reporter: Lorcan >Priority: Minor > > I am unable to build (up to date as of 03/03) trunk with no local changes due > to a failed test: MetadataSchemaCheckerToolTest.testVerifyEvolutionGit. > Build error message shown below: > {code:java} > java.lang.IllegalStateException: Did not find expected file > metadata/src/main/resources/common/metadata/AbortTransactionRecord.json > at > org.apache.kafka.message.checker.CheckerUtils.getDataFromGit(CheckerUtils.java:146) > at > org.apache.kafka.message.checker.MetadataSchemaCheckerTool.run(MetadataSchemaCheckerTool.java:107) > at > org.apache.kafka.message.checker.MetadataSchemaCheckerToolTest.testVerifyEvolutionGit(MetadataSchemaCheckerToolTest.java:32) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18827: Initialize share group state group coordinator impl. [3/N] [kafka]
AndrewJSchofield commented on code in PR #19026: URL: https://github.com/apache/kafka/pull/19026#discussion_r1977260665 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -454,6 +466,77 @@ public CompletableFuture shareGroupHeartbeat( )); } +// Visibility for testing +CompletableFuture persisterInitialize( +InitializeShareGroupStateParameters request, +ShareGroupHeartbeatResponseData defaultResponse +) { +return persister.initializeState(request) +.thenCompose( +response -> handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse) +).exceptionally(exception -> { +GroupTopicPartitionData gtp = request.groupTopicPartitionData(); +log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exception); +return new ShareGroupHeartbeatResponseData() +.setErrorCode(Errors.forException(exception).code()); +}); +} + +private CompletableFuture handlePersisterInitializeResponse( +String groupId, +InitializeShareGroupStateResult persisterInitializeResult, +ShareGroupHeartbeatResponseData defaultResponse +) { + +short persisterErrorCode = Errors.NONE.code(); +for (TopicData topicData : persisterInitializeResult.topicsData()) { +Optional errData = topicData.partitions().stream().filter(partition -> partition.errorCode() != Errors.NONE.code()).findAny(); +if (errData.isPresent()) { +persisterErrorCode = errData.get().errorCode(); +break; +} +} + +if (persisterErrorCode == Errors.NONE.code()) { +Map> topicPartitionMap = new HashMap<>(); +for (TopicData topicData : persisterInitializeResult.topicsData()) { +topicPartitionMap.put( +topicData.topicId(), + topicData.partitions().stream().map(PartitionErrorData::partition).collect(Collectors.toSet()) +); +} +if (topicPartitionMap.isEmpty()) { +return CompletableFuture.completedFuture(defaultResponse); +} +return performShareGroupStateMetadataInitialize(groupId, topicPartitionMap, defaultResponse); +} else { +log.error("Received error while calling initialize state for {} on persister {}.", groupId, persisterErrorCode); +return CompletableFuture.completedFuture( +new ShareGroupHeartbeatResponseData() +.setErrorCode(persisterErrorCode) +); Review Comment: Error message? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -454,6 +466,77 @@ public CompletableFuture shareGroupHeartbeat( )); } +// Visibility for testing +CompletableFuture persisterInitialize( +InitializeShareGroupStateParameters request, +ShareGroupHeartbeatResponseData defaultResponse +) { +return persister.initializeState(request) +.thenCompose( +response -> handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse) +).exceptionally(exception -> { +GroupTopicPartitionData gtp = request.groupTopicPartitionData(); +log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exception); +return new ShareGroupHeartbeatResponseData() +.setErrorCode(Errors.forException(exception).code()); +}); +} + +private CompletableFuture handlePersisterInitializeResponse( +String groupId, +InitializeShareGroupStateResult persisterInitializeResult, +ShareGroupHeartbeatResponseData defaultResponse +) { + Review Comment: nit: Unnecessary blank line ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2322,11 +2345,107 @@ private CoordinatorResult sh //(subscribedTopicNames) to detect a full request as those must be set in a full request. // 2. The member's assignment has been updated. boolean isFullRequest = subscribedTopicNames != null; +List initializeCandidateTopics = List.of(); if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createShareGroupResponseAssignment(updatedMember)); +initializeCandidateTopics = (subscribedTopicNames == null || subscribedTopicNames.isEmpty()) ? +group.subscribedTopicNames().keySet().stream().toList() : subs
Re: [PR] KAFKA-14484: Move UnifiedLog to storage module [kafka]
mimaison commented on code in PR #19030: URL: https://github.com/apache/kafka/pull/19030#discussion_r1977414967 ## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ## @@ -16,44 +16,2351 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InconsistentTopicIdException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeProducersResponseData; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.PrimitiveRef; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.common.RequestLocal; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.UnexpectedAppendOffsetException; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; -public class UnifiedLog { +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * A log which presents a unified view of local and tiered log segments. + * + * The log consists of tiered and local segments with the tiered portion of the log being optional. There could be an + * overlap between the tiered and local segments. The active segment is always guaranteed to be local. If tiered segments + * are present, they always appear at the beginning of the log, followed by an optional region of overlap, followed by the local + * segments including the active segment. + * + * NOTE: this class handles state and behavior specific to tiered segments as well as any behavior combining both tiered + * and local segments. The state and behavior specific to local segments are handled by the encapsulated LocalLog instance. + */ +public class UnifiedLog implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(UnifiedLog.class); -public static final String LOG_FILE_SUFFIX = LogFileUtils.LOG_FILE_SUFFIX; -public static final String INDEX_FILE_SUFFIX = LogFileUtils.INDEX_FILE_SUFFIX; -public static final String TIME_INDEX_FILE_SUFFIX = LogFileUtils.TIME_INDEX_FILE_SUFFIX; -public static final String TXN_INDEX_FILE_SUFFIX = Lo
[jira] [Created] (KAFKA-18912) Failed test MetadataSchemaCheckerToolTest.testVerifyEvolutionGit on Trunk
Lorcan created KAFKA-18912: -- Summary: Failed test MetadataSchemaCheckerToolTest.testVerifyEvolutionGit on Trunk Key: KAFKA-18912 URL: https://issues.apache.org/jira/browse/KAFKA-18912 Project: Kafka Issue Type: Bug Reporter: Lorcan I am unable to build trunk with no local changes due to a failed test: MetadataSchemaCheckerToolTest.testVerifyEvolutionGit. Build error message shown below: {code:java} java.lang.IllegalStateException: Did not find expected file metadata/src/main/resources/common/metadata/AbortTransactionRecord.json at org.apache.kafka.message.checker.CheckerUtils.getDataFromGit(CheckerUtils.java:146) at org.apache.kafka.message.checker.MetadataSchemaCheckerTool.run(MetadataSchemaCheckerTool.java:107) at org.apache.kafka.message.checker.MetadataSchemaCheckerToolTest.testVerifyEvolutionGit(MetadataSchemaCheckerToolTest.java:32) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14484: Move UnifiedLog to storage module [kafka]
mimaison commented on code in PR #19030: URL: https://github.com/apache/kafka/pull/19030#discussion_r1977414279 ## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ## @@ -16,44 +16,2351 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InconsistentTopicIdException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeProducersResponseData; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.PrimitiveRef; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.common.RequestLocal; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.UnexpectedAppendOffsetException; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; -public class UnifiedLog { +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * A log which presents a unified view of local and tiered log segments. + * + * The log consists of tiered and local segments with the tiered portion of the log being optional. There could be an + * overlap between the tiered and local segments. The active segment is always guaranteed to be local. If tiered segments + * are present, they always appear at the beginning of the log, followed by an optional region of overlap, followed by the local + * segments including the active segment. + * + * NOTE: this class handles state and behavior specific to tiered segments as well as any behavior combining both tiered + * and local segments. The state and behavior specific to local segments are handled by the encapsulated LocalLog instance. + */ +public class UnifiedLog implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(UnifiedLog.class); -public static final String LOG_FILE_SUFFIX = LogFileUtils.LOG_FILE_SUFFIX; -public static final String INDEX_FILE_SUFFIX = LogFileUtils.INDEX_FILE_SUFFIX; -public static final String TIME_INDEX_FILE_SUFFIX = LogFileUtils.TIME_INDEX_FILE_SUFFIX; -public static final String TXN_INDEX_FILE_SUFFIX = Lo
Re: [PR] KAFKA-18332: fix ClassDataAbstractionCoupling problem in KafkaRaftClientTest(1/2) [kafka]
chia7712 commented on PR #18926: URL: https://github.com/apache/kafka/pull/18926#issuecomment-2694325084 @leaf-soba could you please fix the build error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18912) Failed test MetadataSchemaCheckerToolTest.testVerifyEvolutionGit on Trunk
[ https://issues.apache.org/jira/browse/KAFKA-18912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorcan updated KAFKA-18912: --- Description: I am unable to build (up to date as of 03/03) trunk with no local changes due to a failed test: MetadataSchemaCheckerToolTest.testVerifyEvolutionGit. Build error message shown below: {code:java} java.lang.IllegalStateException: Did not find expected file metadata/src/main/resources/common/metadata/AbortTransactionRecord.json at org.apache.kafka.message.checker.CheckerUtils.getDataFromGit(CheckerUtils.java:146) at org.apache.kafka.message.checker.MetadataSchemaCheckerTool.run(MetadataSchemaCheckerTool.java:107) at org.apache.kafka.message.checker.MetadataSchemaCheckerToolTest.testVerifyEvolutionGit(MetadataSchemaCheckerToolTest.java:32) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) {code} was: I am unable to build trunk with no local changes due to a failed test: MetadataSchemaCheckerToolTest.testVerifyEvolutionGit. Build error message shown below: {code:java} java.lang.IllegalStateException: Did not find expected file metadata/src/main/resources/common/metadata/AbortTransactionRecord.json at org.apache.kafka.message.checker.CheckerUtils.getDataFromGit(CheckerUtils.java:146) at org.apache.kafka.message.checker.MetadataSchemaCheckerTool.run(MetadataSchemaCheckerTool.java:107) at org.apache.kafka.message.checker.MetadataSchemaCheckerToolTest.testVerifyEvolutionGit(MetadataSchemaCheckerToolTest.java:32) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) {code} > Failed test MetadataSchemaCheckerToolTest.testVerifyEvolutionGit on Trunk > - > > Key: KAFKA-18912 > URL: https://issues.apache.org/jira/browse/KAFKA-18912 > Project: Kafka > Issue Type: Bug >Reporter: Lorcan >Priority: Minor > > I am unable to build (up to date as of 03/03) trunk with no local changes due > to a failed test: MetadataSchemaCheckerToolTest.testVerifyEvolutionGit. > Build error message shown below: > {code:java} > java.lang.IllegalStateException: Did not find expected file > metadata/src/main/resources/common/metadata/AbortTransactionRecord.json > at > org.apache.kafka.message.checker.CheckerUtils.getDataFromGit(CheckerUtils.java:146) > at > org.apache.kafka.message.checker.MetadataSchemaCheckerTool.run(MetadataSchemaCheckerTool.java:107) > at > org.apache.kafka.message.checker.MetadataSchemaCheckerToolTest.testVerifyEvolutionGit(MetadataSchemaCheckerToolTest.java:32) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-18912) Failed test MetadataSchemaCheckerToolTest.testVerifyEvolutionGit on Trunk
[ https://issues.apache.org/jira/browse/KAFKA-18912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931958#comment-17931958 ] Chia-Ping Tsai commented on KAFKA-18912: [~lorcanj] If all you want to do is to "build code", you could skip the tests by following command "./gradlew clean build -x test" > Failed test MetadataSchemaCheckerToolTest.testVerifyEvolutionGit on Trunk > - > > Key: KAFKA-18912 > URL: https://issues.apache.org/jira/browse/KAFKA-18912 > Project: Kafka > Issue Type: Bug >Reporter: Lorcan >Priority: Minor > > I am unable to build trunk with no local changes due to a failed test: > MetadataSchemaCheckerToolTest.testVerifyEvolutionGit. > Build error message shown below: > {code:java} > java.lang.IllegalStateException: Did not find expected file > metadata/src/main/resources/common/metadata/AbortTransactionRecord.json > at > org.apache.kafka.message.checker.CheckerUtils.getDataFromGit(CheckerUtils.java:146) > at > org.apache.kafka.message.checker.MetadataSchemaCheckerTool.run(MetadataSchemaCheckerTool.java:107) > at > org.apache.kafka.message.checker.MetadataSchemaCheckerToolTest.testVerifyEvolutionGit(MetadataSchemaCheckerToolTest.java:32) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18867: add tests to describe topic configs with empty name [kafka]
Rancho-7 commented on code in PR #19075: URL: https://github.com/apache/kafka/pull/19075#discussion_r1977580342 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -1626,6 +1626,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic) client.describeConfigs(Collections.singletonList(existingTopic)).values.get(existingTopic).get() +val defaultTopic = new ConfigResource(ConfigResource.Type.TOPIC, "") +val describeResult0 = client.describeConfigs(Collections.singletonList(defaultTopic)) + +assertTrue(assertThrows(classOf[ExecutionException], () => describeResult0.values.get(defaultTopic).get).getCause.isInstanceOf[InvalidTopicException]) Review Comment: Thanks for the review! I will try to fix it in this way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18867: add tests to describe topic configs with empty name [kafka]
Rancho-7 commented on code in PR #19075: URL: https://github.com/apache/kafka/pull/19075#discussion_r1977584859 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -1626,6 +1626,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic) client.describeConfigs(Collections.singletonList(existingTopic)).values.get(existingTopic).get() +val defaultTopic = new ConfigResource(ConfigResource.Type.TOPIC, "") +val describeResult0 = client.describeConfigs(Collections.singletonList(defaultTopic)) Review Comment: Thanks for the suggestion! I agree that would make the code cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18864:remove the Evolving tag from stable public interfaces [kafka]
dajac commented on PR #19036: URL: https://github.com/apache/kafka/pull/19036#issuecomment-2694537335 @junrao I cherry-picked the commit to 4.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Don't print cleaning group metadata log if empty [kafka]
dajac opened a new pull request, #19080: URL: https://github.com/apache/kafka/pull/19080 The new group coordinator prints the following line at fixed interval even if no groups were deleted: ``` Generated 0 tombstone records while cleaning up group metadata in 0 milliseconds. (org.apache.kafka.coordinator.group.GroupCoordinatorShard) ``` The time component has some value in its own but it may be better to not print if when there are not records in order to reduce the spam in the logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator implementation for DeleteShareGroupOffsets RPC [kafka]
AndrewJSchofield commented on code in PR #18976: URL: https://github.com/apache/kafka/pull/18976#discussion_r1977599300 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1206,6 +1210,93 @@ public CompletableFuture deleteShareGroupOffsets( +RequestContext context, +DeleteShareGroupOffsetsRequestData requestData +) { +if (!isActive.get()) { +return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE)); +} + +if (metadataImage == null) { +return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE)); +} + +Map requestTopicIdToNameMapping = new HashMap<>(); +List deleteShareGroupStateRequestTopicsData = new ArrayList<>(requestData.topics().size()); + List deleteShareGroupOffsetsResponseTopicList = new ArrayList<>(requestData.topics().size()); + +requestData.topics().forEach(topic -> { +Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName()); +if (topicId != null) { +requestTopicIdToNameMapping.put(topicId, topic.topicName()); +deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData() +.setTopicId(topicId) +.setPartitions( +topic.partitions().stream().map( +partitionIndex -> new DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex) +).toList() +)); +} else { +deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() +.setTopicName(topic.topicName()) +.setPartitions(topic.partitions().stream().map( +partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() +.setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) +).toList())); +} +}); + +// If the request for the persister is empty, just complete the operation right away. +if (deleteShareGroupStateRequestTopicsData.isEmpty()) { +return CompletableFuture.completedFuture( +new DeleteShareGroupOffsetsResponseData() +.setResponses(deleteShareGroupOffsetsResponseTopicList)); +} + +DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData() +.setGroupId(requestData.groupId()) +.setTopics(deleteShareGroupStateRequestTopicsData); +CompletableFuture future = new CompletableFuture<>(); + persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)) +.whenComplete((result, error) -> { +if (error != null) { +log.error("Failed to delete share partitions"); Review Comment: I think you should replace "delete share partitions" with "delete share group state". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker (#18997) [kafka]
dajac commented on PR #19058: URL: https://github.com/apache/kafka/pull/19058#issuecomment-2694563500 > @dajac, I think we can just cherry-picked it, because there is no conflict. Sounds good. I did it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18864) remove the Evolving tag from stable public interfaces
[ https://issues.apache.org/jira/browse/KAFKA-18864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18864: Fix Version/s: 4.0.0 (was: 4.1.0) > remove the Evolving tag from stable public interfaces > - > > Key: KAFKA-18864 > URL: https://issues.apache.org/jira/browse/KAFKA-18864 > Project: Kafka > Issue Type: Improvement > Components: Tiered-Storage >Affects Versions: 3.9.0 >Reporter: Jun Rao >Assignee: xuanzhang gong >Priority: Major > Fix For: 4.0.0 > > Attachments: 螢幕快照 2025-02-26 14-56-31.png > > > Since tiered storage GA-ed in 3.9.0, we need to remove the Evolving tag from > public interfaces. cc [~satish.duggana] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18878: Added share session cache and delayed share fetch metrics (KIP-1103) [kafka]
AndrewJSchofield commented on code in PR #19059: URL: https://github.com/apache/kafka/pull/19059#discussion_r1977587676 ## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ## @@ -1118,6 +1119,23 @@ public void testPartitionMaxBytesFromUniformStrategyInCombineLogReadResponse() { true); } +@Test +public void testOnCompleteExecutionOntimeout() { Review Comment: nit: `OnTimeout`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker [kafka]
dajac commented on PR #18997: URL: https://github.com/apache/kafka/pull/18997#issuecomment-2694548745 I cherry-picked the commit to 4.0. cc @junrao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
dajac commented on PR #19065: URL: https://github.com/apache/kafka/pull/19065#issuecomment-2694557772 @mingdaoy Would you have time for addressing @chia7712's comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18878: Added share session cache and delayed share fetch metrics (KIP-1103) [kafka]
apoorvmittal10 commented on code in PR #19059: URL: https://github.com/apache/kafka/pull/19059#discussion_r1977614076 ## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ## @@ -1118,6 +1119,23 @@ public void testPartitionMaxBytesFromUniformStrategyInCombineLogReadResponse() { true); } +@Test +public void testOnCompleteExecutionOntimeout() { Review Comment: My bad, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator implementation for DeleteShareGroupOffsets RPC [kafka]
AndrewJSchofield commented on code in PR #18976: URL: https://github.com/apache/kafka/pull/18976#discussion_r1977606102 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java: ## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest; +import org.apache.kafka.common.requests.DeleteShareGroupOffsetsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class is the handler for {@link KafkaAdminClient#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call + */ +public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched> { + +private final CoordinatorKey groupId; + +private final Logger log; + +private final Set partitions; + +private final CoordinatorStrategy lookupStrategy; + + +public DeleteShareGroupOffsetsHandler(String groupId, Set partitions, LogContext logContext) { +this.groupId = CoordinatorKey.byGroupId(groupId); +this.partitions = partitions; +this.log = logContext.logger(DeleteShareGroupOffsetsHandler.class); +this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext); +} + +@Override +public String apiName() { +return "deleteShareGroupOffsets"; +} + +@Override +public AdminApiLookupStrategy lookupStrategy() { +return lookupStrategy; +} + +public static AdminApiFuture.SimpleAdminApiFuture> newFuture(String groupId) { +return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId))); +} + +private void validateKeys(Set groupIds) { +if (!groupIds.equals(Collections.singleton(groupId))) { +throw new IllegalArgumentException("Received unexpected group ids " + groupIds + +" (expected only " + Collections.singleton(groupId) + ")"); +} +} + +@Override +DeleteShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set groupIds) { +validateKeys(groupIds); + +final List topics = +new ArrayList<>(); + partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add( +new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() +.setTopicName(topic) +.setPartitions(topicPartitions.stream() +.map(TopicPartition::partition) +.collect(Collectors.toList()) +) +)); + +return new DeleteShareGroupOffsetsRequest.Builder( +new DeleteShareGroupOffsetsRequestData() +.setGroupId(groupId.idValue) +.setTopics(topics), +true +); +} + +@Override +public ApiResult> handleResponse( +Node coordinator, +Set groupIds, +AbstractResponse abstractResponse +) { +validateKeys(groupIds); + +final DeleteShareGroupOffsetsResponse response = (DeleteShareGroupOffsetsResponse) abstractResponse; + +final Errors groupError = Errors.forCode(response.data().errorCode()); + +if (groupError != Errors.NONE) { +final Set groupsToUnmap = new HashSet<>(); +final Map groupsFailed = new HashMap<>(); +
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
dajac commented on PR #19065: URL: https://github.com/apache/kafka/pull/19065#issuecomment-2694553565 @chia7712 Good catch! I missed that one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Don't allow cloning a ProducerStateEntry with a different producer id [kafka]
github-actions[bot] commented on PR #18990: URL: https://github.com/apache/kafka/pull/18990#issuecomment-2691891283 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17981) add Integration test for ConfigCommand to add config `key=[val1,val2]`
[ https://issues.apache.org/jira/browse/KAFKA-17981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17981. Fix Version/s: 4.1.0 Resolution: Fixed > add Integration test for ConfigCommand to add config `key=[val1,val2]` > -- > > Key: KAFKA-17981 > URL: https://issues.apache.org/jira/browse/KAFKA-17981 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Nick Guo >Priority: Minor > Fix For: 4.1.0 > > > follow-up of KAFKA-17583 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
mingdaoy commented on PR #19065: URL: https://github.com/apache/kafka/pull/19065#issuecomment-2694625874 @dajac Doing it now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18876: 4.0 documentation improvement [kafka]
dajac commented on code in PR #19065: URL: https://github.com/apache/kafka/pull/19065#discussion_r1977913129 ## docs/ops.html: ## @@ -1343,13 +1343,13 @@ Check and wait until the Lag is small for a majority of the controllers. If the leader's end offset is not increasing, you can wait until the lag is 0 for a majority; otherwise, you can pick the latest leader end offset and wait until all replicas have reached it. Check and wait until the LastFetchTimestamp and LastCaughtUpTimestamp are close to each other for the majority of the controllers. At this point it is safer to format the controller's metadata log directory. This can be done by running the kafka-storage.sh command. - $ bin/kafka-storage.sh format --cluster-id uuid --config server_properties + $ bin/kafka-storage.sh format --cluster-id uuid --config config/server.properties It is possible for the bin/kafka-storage.sh format command above to fail with a message like Log directory ... is already formatted. This can happen when combined mode is used and only the metadata log directory was lost but not the others. In that case and only in that case, can you run the bin/kafka-storage.sh format command with the --ignore-formatted option. Start the KRaft controller after formatting the log directories. - $ bin/kafka-server-start.sh server_properties + $ bin/kafka-server-start.sh server.properties Review Comment: This is inconsistent with the previous change: `config/server.properties`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18646: Null records in fetch response breaks librdkafka [kafka]
junrao commented on code in PR #18726: URL: https://github.com/apache/kafka/pull/18726#discussion_r1977923340 ## clients/src/main/resources/common/message/FetchResponse.json: ## @@ -106,7 +106,7 @@ ]}, { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, "entityType": "brokerId", "about": "The preferred read replica for the consumer to use on its next fetch request."}, -{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."} Review Comment: Thanks. I missed the update to the KIP wiki. Somehow I didn't see this particular update in the discussion thread. We could discuss a bit more on the options as part of this KIP. I just sent an update to the discussion thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: remove PageView example to support Java11 for :streams:examples module [kafka]
mjsax merged PR #19052: URL: https://github.com/apache/kafka/pull/19052 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker (#18997) [kafka]
junrao closed pull request #19058: KAFKA-18844: Stale features information in QuorumController#registerBroker (#18997) URL: https://github.com/apache/kafka/pull/19058 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Prevent broker fencing by adjusting resendExponentialBackoff in BrokerLifecycleManager [kafka]
cmccabe merged PR #19061: URL: https://github.com/apache/kafka/pull/19061 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org