[GitHub] [kafka] sudeshwasnik commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush
sudeshwasnik commented on code in PR #12784: URL: https://github.com/apache/kafka/pull/12784#discussion_r1006513264 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,15 +282,16 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +if (flushFuture != null) { Review Comment: Started with the same thought, but tried making it consistent with [WorkerSourceTask](https://github.com/apache/kafka/blob/3ae1afa43838066e44ea78918050c6780c208042/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L253-L267). If there are no offsets and no error, we should still commit (producer and task). Need to make changes in WorkerSourceTask as well! I'll make the suggested change ! thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14337) topic name with "." cannot be created after deletion
[ https://issues.apache.org/jira/browse/KAFKA-14337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624931#comment-17624931 ] thanhnd96 commented on KAFKA-14337: --- Hi admin, Tks you for fixed , after i change code to this link [GitHub Pull Request #12790|https://github.com/apache/kafka/pull/12790] , it's worked on kafka Source download: [kafka-3.3.1-src.tgz.|https://downloads.apache.org/kafka/3.3.1/kafka-3.3.1-src.tgz] But can I fix on Binary downloads: Scala 2.13 - [kafka_2.13-3.3.1.tgz|https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz] ([asc|https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz.asc], [sha512|https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz.sha512]) on my kafka. I deploy to 3 broker kafka running kraft mode . Best regards !!! > topic name with "." cannot be created after deletion > > > Key: KAFKA-14337 > URL: https://issues.apache.org/jira/browse/KAFKA-14337 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.1 >Reporter: thanhnd96 >Assignee: Luke Chen >Priority: Critical > > Hi admin, > My issue is after i create topic like topic.AAA or Topic.AAA.01 then delete 1 > of the other 2 topics. > Then i can't create 1 of the 2 topics. > But i create topic test123 then delete and recreate fine. > This is log i tried to create topic.AAA > WARN [Controller 1] createTopics: failed with unknown server exception > NoSuchElementException at epoch 14 in 193 us. Renouncing leadership and > reverting to the last committed offset 28. > (org.apache.kafka.controller.QuorumController) > java.util.NoSuchElementException > at > org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:167) > at > org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:139) > at > org.apache.kafka.timeline.TimelineHashSet$ValueIterator.next(TimelineHashSet.java:120) > at > org.apache.kafka.controller.ReplicationControlManager.validateNewTopicNames(ReplicationControlManager.java:799) > at > org.apache.kafka.controller.ReplicationControlManager.createTopics(ReplicationControlManager.java:567) > at > org.apache.kafka.controller.QuorumController.lambda$createTopics$7(QuorumController.java:1832) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:767) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.base/java.lang.Thread.run(Thread.java:829) > ERROR [Controller 1] processBrokerHeartbeat: unable to start processing > because of NotControllerException. > (org.apache.kafka.controller.QuorumController) > > I'm run kafka mode Kraft !!! > Tks admin. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13419) sync group failed with rebalanceInProgress error might cause out-of-date ownedPartition in Cooperative protocol
[ https://issues.apache.org/jira/browse/KAFKA-13419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624934#comment-17624934 ] A. Sophie Blee-Goldman commented on KAFKA-13419: Cool, thanks for confirming this is already handled in the assignor. Ok then, that said – or actually, _because_ of this, I do agree that we should revert https://issues.apache.org/jira/browse/KAFKA-13891. Maybe we can look into the logs to better understand the original problem you were experiencing [~aiquestion] – or were you not personally hitting the issue "fixed" by https://issues.apache.org/jira/browse/KAFKA-13891, just vigilant about submitting a patch when you noticed we had claimed to make that change in [https://github.com/apache/kafka/pull/11451] but seemingly forgot? > sync group failed with rebalanceInProgress error might cause out-of-date > ownedPartition in Cooperative protocol > --- > > Key: KAFKA-13419 > URL: https://issues.apache.org/jira/browse/KAFKA-13419 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.1.0 > > > In KAFKA-13406, we found there's user got stuck when in rebalancing with > cooperative sticky assignor. The reason is the "ownedPartition" is > out-of-date, and it failed the cooperative assignment validation. > Investigate deeper, I found the root cause is we didn't reset generation and > state after sync group fail. In KAFKA-12983, we fixed the issue that the > onJoinPrepare is not called in resetStateAndRejoin method. And it causes the > ownedPartition not get cleared. But there's another case that the > ownedPartition will be out-of-date. Here's the example: > # consumer A joined and synced group successfully with generation 1 > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > > We might want to do *resetStateAndRejoin* when *RebalanceInProgressException* > errors happend in *sync group*. Because when we got sync group error, it > means, join group passed, and other consumers (and the leader) might already > completed this round of rebalance. The assignment distribution this consumer > have is already out-of-date. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-10149) Do not prevent automatic preferred election when reassignment in progress
[ https://issues.apache.org/jira/browse/KAFKA-10149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-10149: - Assignee: Shenglong Zhang > Do not prevent automatic preferred election when reassignment in progress > - > > Key: KAFKA-10149 > URL: https://issues.apache.org/jira/browse/KAFKA-10149 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Shenglong Zhang >Priority: Major > Fix For: 3.4.0 > > > Currently the controller will not do preferred leader elections automatically > when a reassignment is in progress. If a broker crashes or is restarted with > a reassignment in progress, this leaves the leadership massively skewed until > the reassignment completes. I am not sure if there is a good reason for this, > but it seems not ideal. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-13419) sync group failed with rebalanceInProgress error might cause out-of-date ownedPartition in Cooperative protocol
[ https://issues.apache.org/jira/browse/KAFKA-13419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624934#comment-17624934 ] A. Sophie Blee-Goldman edited comment on KAFKA-13419 at 10/27/22 7:56 AM: -- Cool, thanks for confirming this is already handled in the assignor. Ok then, that said – or actually, _because_ of this, I do agree that we should revert https://issues.apache.org/jira/browse/KAFKA-13891. Maybe we can look into the logs to better understand the original problem you were experiencing [~aiquestion] – or were you not personally hitting the issue "fixed" by https://issues.apache.org/jira/browse/KAFKA-13891, just vigilant about submitting a patch when you noticed we had mentioned making this change in [https://github.com/apache/kafka/pull/11451] but then apparently left it out? was (Author: ableegoldman): Cool, thanks for confirming this is already handled in the assignor. Ok then, that said – or actually, _because_ of this, I do agree that we should revert https://issues.apache.org/jira/browse/KAFKA-13891. Maybe we can look into the logs to better understand the original problem you were experiencing [~aiquestion] – or were you not personally hitting the issue "fixed" by https://issues.apache.org/jira/browse/KAFKA-13891, just vigilant about submitting a patch when you noticed we had claimed to make that change in [https://github.com/apache/kafka/pull/11451] but seemingly forgot? > sync group failed with rebalanceInProgress error might cause out-of-date > ownedPartition in Cooperative protocol > --- > > Key: KAFKA-13419 > URL: https://issues.apache.org/jira/browse/KAFKA-13419 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.1.0 > > > In KAFKA-13406, we found there's user got stuck when in rebalancing with > cooperative sticky assignor. The reason is the "ownedPartition" is > out-of-date, and it failed the cooperative assignment validation. > Investigate deeper, I found the root cause is we didn't reset generation and > state after sync group fail. In KAFKA-12983, we fixed the issue that the > onJoinPrepare is not called in resetStateAndRejoin method. And it causes the > ownedPartition not get cleared. But there's another case that the > ownedPartition will be out-of-date. Here's the example: > # consumer A joined and synced group successfully with generation 1 > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > > We might want to do *resetStateAndRejoin* when *RebalanceInProgressException* > errors happend in *sync group*. Because when we got sync group error, it > means, join group passed, and other consumers (and the leader) might already > completed this round of rebalance. The assignment distribution this consumer > have is already out-of-date. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006531902 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -135,8 +161,7 @@ public class KafkaBasedLogTest { @SuppressWarnings("unchecked") @Before public void setUp() { -store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, -TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer); +store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, () -> null, consumedCallback, time, null); Review Comment: > I think the @Mock private Runnable initializer is the incorrect signature here, and should be changed to be a Consumer mock. Otherwise the initializer value isn't behaving like a mock for the mocked kafka based log. So it appears that the `@Mock private Runnable initializer` is being used correctly in the `setupWithAdmin` method and if you look at `setupWithAdmin` you can confirm its correct because we aren't using a mock, instead we actually using the raw constructor, i.e. ```java private void setupWithAdmin() { Supplier adminSupplier = () -> admin; java.util.function.Consumer initializer = admin -> { }; store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } ``` I believe the oversight was just in the `setUp` of the mock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sudeshwasnik commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush
sudeshwasnik commented on code in PR #12784: URL: https://github.com/apache/kafka/pull/12784#discussion_r1006513264 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,15 +282,16 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +if (flushFuture != null) { Review Comment: Started with the same thought, but tried making it consistent with [WorkerSourceTask](https://github.com/apache/kafka/blob/3ae1afa43838066e44ea78918050c6780c208042/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L253-L267). If there are no offsets and no errors, we should still commit (producer and task) +1 I'll make the suggested change! thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] shirenchuang opened a new pull request, #12793: [KAFKA-14328]: KafkaAdminClient should be Changing the exception level …
shirenchuang opened a new pull request, #12793: URL: https://github.com/apache/kafka/pull/12793 [ISSUE: KAFKA-14328](https://issues.apache.org/jira/browse/KAFKA-14328) PR: return the real error while we return the timeout error In the Pending Call timeout scenario, record the last retry timeout exception and print the last retry exception while it is timed out This helps users locate problems ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006543034 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -547,32 +538,18 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio private void setupWithAdmin() { Supplier adminSupplier = () -> admin; java.util.function.Consumer initializer = admin -> { }; -store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, -TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); +store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } -private void expectProducerAndConsumerCreate() throws Exception { -PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); -PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); -} - -private void expectStart() throws Exception { +private void expectStart() { initializer.run(); -EasyMock.expectLastCall().times(1); - -expectProducerAndConsumerCreate(); +verify(initializer, times(1)).run(); } private void expectStop() { producer.close(); -PowerMock.expectLastCall(); +verify(producer, times(1)).close(); Review Comment: > Does the producer.close() call trivially satisfy the following verify() call? So to be honest I don't know whats going on here. The literal translation of ```java producer.close(); PowerMock.expectLastCall(); ``` to Mockito is ```java producer.close(); verify(producer, times(1)).close(); ``` So to answer your question this does seem fishy because of course we call `producer.close()` we expect it to be called unless some exception is thrown (which would fail the test otherwise). > Also it looks like this method is not really translated from easymock yet, as it is separate from the store.stop() calls throughout the test. I think that instead of calling store.stop(), the test should call expectStop, and then the expectStop calls store.stop and asserts that the shutdown procedure happens. Are you alluding to the fact that the equivalent `PowerMock.verifyAll();` is missing here? If thats so then yes this is indeed the case and this is one of the problems with PowerMock/EasyMock is that its not that explicit, i.e. `PowerMock.verifyAll()` will verify methods that are directly called (which almost all of the time is pointless) where as ideally you are meant to use `verify` on methods that are called indirectly called by tests to ensure that they are being executed. So I guess to answer your test more directly, I can see what you are saying but I would like to know what exactly a "shutdown procedure" actually means in terms of mocking, i.e. what methods (aside from `store.stop()` which we directly call) should we be verifying? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006543034 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -547,32 +538,18 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio private void setupWithAdmin() { Supplier adminSupplier = () -> admin; java.util.function.Consumer initializer = admin -> { }; -store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, -TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); +store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } -private void expectProducerAndConsumerCreate() throws Exception { -PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); -PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); -} - -private void expectStart() throws Exception { +private void expectStart() { initializer.run(); -EasyMock.expectLastCall().times(1); - -expectProducerAndConsumerCreate(); +verify(initializer, times(1)).run(); } private void expectStop() { producer.close(); -PowerMock.expectLastCall(); +verify(producer, times(1)).close(); Review Comment: > Does the producer.close() call trivially satisfy the following verify() call? So to be honest I don't know whats going on here. The literal translation of ```java producer.close(); PowerMock.expectLastCall(); ``` to Mockito is ```java producer.close(); verify(producer, times(1)).close(); ``` To answer your question this does seem fishy because of course we call `producer.close()` we expect it to be called unless some exception is thrown (which would fail the test otherwise anyways). > Also it looks like this method is not really translated from easymock yet, as it is separate from the store.stop() calls throughout the test. I think that instead of calling store.stop(), the test should call expectStop, and then the expectStop calls store.stop and asserts that the shutdown procedure happens. Are you alluding to the fact that the equivalent `PowerMock.verifyAll();` is missing here? If thats so then yes this is indeed the case and this is one of the problems with PowerMock/EasyMock is that its not that explicit, i.e. `PowerMock.verifyAll()` will verify methods that are directly called (which almost all of the time is pointless) where as ideally you are meant to use `verify` on methods that are called indirectly called by tests to ensure that they are being executed. So I guess to answer your test more directly, I can see what you are saying but I would like to know what exactly a "shutdown procedure" actually means in terms of mocking, i.e. what methods (aside from `store.stop()` which we directly call) should we be verifying? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006543034 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -547,32 +538,18 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio private void setupWithAdmin() { Supplier adminSupplier = () -> admin; java.util.function.Consumer initializer = admin -> { }; -store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, -TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); +store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } -private void expectProducerAndConsumerCreate() throws Exception { -PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); -PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); -} - -private void expectStart() throws Exception { +private void expectStart() { initializer.run(); -EasyMock.expectLastCall().times(1); - -expectProducerAndConsumerCreate(); +verify(initializer, times(1)).run(); } private void expectStop() { producer.close(); -PowerMock.expectLastCall(); +verify(producer, times(1)).close(); Review Comment: > Does the producer.close() call trivially satisfy the following verify() call? So to be honest I don't know whats going on here. The literal translation of ```java producer.close(); PowerMock.expectLastCall(); ``` to Mockito is ```java producer.close(); verify(producer, times(1)).close(); ``` To answer your question this does seem fishy because of course we call `producer.close()` we expect it to be called unless some exception is thrown (which would fail the test otherwise anyways). > Also it looks like this method is not really translated from easymock yet, as it is separate from the store.stop() calls throughout the test. I think that instead of calling store.stop(), the test should call expectStop, and then the expectStop calls store.stop and asserts that the shutdown procedure happens. Are you alluding to the fact that the equivalent `PowerMock.verifyAll();` is missing at the end of the test? If thats so then yes this is indeed the case and this is one of the problems with PowerMock/EasyMock is that its not that explicit, i.e. `PowerMock.verifyAll()` will verify methods that are directly called (which almost all of the time is pointless) where as ideally you are meant to use `verify` on methods that are called indirectly called by tests to ensure that they are being executed. So I guess to answer your test more directly, I can see what you are saying but I would like to know what exactly a "shutdown procedure" actually means in terms of mocking, i.e. what methods (aside from `store.stop()` which we directly call) should we be verifying? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006543034 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -547,32 +538,18 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio private void setupWithAdmin() { Supplier adminSupplier = () -> admin; java.util.function.Consumer initializer = admin -> { }; -store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, -TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); +store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } -private void expectProducerAndConsumerCreate() throws Exception { -PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); -PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); -} - -private void expectStart() throws Exception { +private void expectStart() { initializer.run(); -EasyMock.expectLastCall().times(1); - -expectProducerAndConsumerCreate(); +verify(initializer, times(1)).run(); } private void expectStop() { producer.close(); -PowerMock.expectLastCall(); +verify(producer, times(1)).close(); Review Comment: > Does the producer.close() call trivially satisfy the following verify() call? So to be honest I don't know whats going on here. The literal translation of ```java producer.close(); PowerMock.expectLastCall(); ``` to Mockito is ```java producer.close(); verify(producer, times(1)).close(); ``` To answer your question this does seem fishy because of course we call `producer.close()` we expect it to be called unless some exception is thrown (which would fail the test otherwise anyways). > Also it looks like this method is not really translated from easymock yet, as it is separate from the store.stop() calls throughout the test. I think that instead of calling store.stop(), the test should call expectStop, and then the expectStop calls store.stop and asserts that the shutdown procedure happens. Are you alluding to the fact that the equivalent `PowerMock.verifyAll();` is missing at the end of the test? If thats so then yes this is indeed the case and this is one of the problems with PowerMock/EasyMock is that its not that explicit, i.e. `PowerMock.verifyAll()` will verify methods that are directly called (which almost all of the time is pointless) where as ideally you are meant to use `verify` on methods that are called indirectly called by tests to ensure that they are being executed. So I guess to answer your test more directly, I can see what you are saying but I would like to know what exactly a "shutdown procedure" actually means in terms of mocking, i.e. what methods (aside from `store.stop()` which we directly call) should we be verifying? If there aren't any indirect methods that need to be verified with `.verify` then unless I am missing something nothing really needs to be done here. As long as `.stop` is successfully executed without throwing an exception then that is the best we can do? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14328) KafkaAdminClient should be Changing the exception level When an exception occurs
[ https://issues.apache.org/jira/browse/KAFKA-14328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624947#comment-17624947 ] shizhenzhen commented on KAFKA-14328: - PR: https://github.com/apache/kafka/pull/12793 > KafkaAdminClient should be Changing the exception level When an exception > occurs > > > Key: KAFKA-14328 > URL: https://issues.apache.org/jira/browse/KAFKA-14328 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 3.3 >Reporter: shizhenzhen >Priority: Major > Attachments: image-2022-10-21-11-19-21-064.png, > image-2022-10-21-14-56-31-753.png, image-2022-10-21-16-54-40-588.png, > image-2022-10-21-16-56-45-448.png, image-2022-10-21-16-58-19-353.png, > image-2022-10-24-14-28-10-365.png, image-2022-10-24-14-47-30-641.png, > image-2022-10-24-14-48-27-907.png > > > > > KafkaAdminClient 的一些日志全部是 log.trace. 当遇到异常的时候根本不知道什么原因,导致排查问题非常艰难。 > > 就比如下面这里,当去请求Metadata请求的时候,如果查询到的Topic有分区Leader=-1的时候,就会抛出异常; > > 但是这个时候实际上异常是被吞掉了的,这里往上面抛出异常之后,到了下面第二张图的 Catch部分。 > 他会把这个请求重新放到到请求队列中。然后就会陷入无限读重试之后,直到达到超时时间抛出异常:Timed out waiting for a node > assignment. Call: metadata > > 无法给Metadata请求分配节点,正常情况下谁知道他真正的异常其实是 > > ``` > org.apache.kafka.common.errors.LeaderNotAvailableException: There is no > leader for this topic-partition as we are in the middle of a leadership > election. > > ``` > > > > > !https://user-images.githubusercontent.com/10442648/196944422-e11b732f-6f7f-4f77-8d9c-1f0544257461.png! > > > > 下面截图那里是我改成的warn基本的日志 > !image-2022-10-21-11-19-21-064.png! > > 所以我希望这里的log.trace 能改成 log.warn ; 给一个提醒。 > 就可以说明当前因为某个异常的原因而导致可能的重试。 > > > > > > !image-2022-10-21-14-56-31-753.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624948#comment-17624948 ] A. Sophie Blee-Goldman commented on KAFKA-14016: Hey, I realize there's a lot of history leading up to this issue and the associated "fix", so forgive me for missing anything while I'm getting up to speed – but taking a step back, before we jump into the discussion about alternative fixes for KAFKA-13891 can we flesh out the actual underlying problem & take stock of what symptoms people have actually seen vs theorized about? Sorry for pushing back on this, it just seems like we've been playing whack-a-mole with these rebalancing issues lately, and the moles have started to whack us back. I just want to encourage us all to approach this carefully so we're not having the exact same conversation and reaching for yet more alternative fixes by the next release. [~aiquestion] I guess this question is mostly directed at you, as the original reporter of KAFKA-13891: were you able to reproduce or experienced this in a real application, or was this mainly filed to follow up on the suggestion for it in KAFKA-13419? Sorry to repeat a bit of the conversation over on KAFKA-13419, but for full context here I was curious about the actual symptoms of KAFKA-13891 vs the scenario outlined in that ticket. Specifically, I think we need to expand and/or elaborate on the effect of having an old (but valid) generation when the consumer is still the most recent owner to claim its given partitions. The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner, which in this case would always be the consumer that received this REBALANCE_IN_PROGRESS error. To summarize, basically I'm not convinced that failing to reset the generation has any impact on the assignor in this case, ie that there is an actual bug/problem described in KAFKA-13891 – in fact, forcing the consumer to reset everything when a rebalance is restarted seems actively detrimental, as evidenced by this exact ticket. I would vote for just reverting the changes we made for that and call it a day until/unless we see concrete evidence/symptoms that are impacting a real application. [~showuon] / [~aiquestion] / [~guozhang] Thoughts? > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006531902 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -135,8 +161,7 @@ public class KafkaBasedLogTest { @SuppressWarnings("unchecked") @Before public void setUp() { -store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, -TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer); +store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, () -> null, consumedCallback, time, null); Review Comment: > I think the @Mock private Runnable initializer is the incorrect signature here, and should be changed to be a Consumer mock. Otherwise the initializer value isn't behaving like a mock for the mocked kafka based log. So it appears that the `@Mock private Runnable initializer` is being used correctly in the `setupWithAdmin` method and if you look at `setupWithAdmin` you can confirm this because we aren't using a mock, instead we actually using the raw constructor on an extended class, i.e. ```java private void setupWithAdmin() { Supplier adminSupplier = () -> admin; java.util.function.Consumer initializer = admin -> { }; store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } ``` I believe the oversight was just in the `setUp` of the mock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006560892 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -547,32 +538,18 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio private void setupWithAdmin() { Supplier adminSupplier = () -> admin; java.util.function.Consumer initializer = admin -> { }; -store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, -TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); +store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } -private void expectProducerAndConsumerCreate() throws Exception { -PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); -PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); -} - -private void expectStart() throws Exception { +private void expectStart() { initializer.run(); -EasyMock.expectLastCall().times(1); - -expectProducerAndConsumerCreate(); +verify(initializer, times(1)).run(); Review Comment: Also answered in https://github.com/apache/kafka/pull/12781#discussion_r1006543034 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush
C0urante commented on PR #12784: URL: https://github.com/apache/kafka/pull/12784#issuecomment-1293192830 Thanks @sudeshwasnik, good catch! Would you mind filing a Jira for this so that others who encounter the same bug can track its status and fixed versions? Regarding the actual changes--I think we should expand unit testing coverage here. The tweak to `ExactlyOnceWorkerSourceTaskTest` does address the case of synchronous offset flush failures (i.e., ones detected before offsets are dispatched to the producer), but it drops coverage for the case of asynchronous offset flush failures (i.e., ones detected after offsets are dispatched to the producer). Can we account for both cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] shekhar-rajak commented on pull request #12777: Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest
shekhar-rajak commented on PR #12777: URL: https://github.com/apache/kafka/pull/12777#issuecomment-1293213497 @divijvaidya Please have a look now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006616025 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -547,32 +538,18 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio private void setupWithAdmin() { Supplier adminSupplier = () -> admin; java.util.function.Consumer initializer = admin -> { }; -store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, -TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); +store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } -private void expectProducerAndConsumerCreate() throws Exception { -PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); -PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); -} - -private void expectStart() throws Exception { +private void expectStart() { initializer.run(); -EasyMock.expectLastCall().times(1); - -expectProducerAndConsumerCreate(); +verify(initializer, times(1)).run(); } private void expectStop() { producer.close(); -PowerMock.expectLastCall(); +verify(producer, times(1)).close(); Review Comment: Okay so you can largely ignore my previous comment, I missed the `replayAll` which was being called elsewhere which means we are actually expecting `producer.close()` to be called indirectly. I did this quickly and realized that I also need to change the `initial` argument for the mock constructor in `setUp` (see https://github.com/apache/kafka/pull/12781#discussion_r1006531902) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006616339 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -135,8 +161,7 @@ public class KafkaBasedLogTest { @SuppressWarnings("unchecked") @Before public void setUp() { -store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, -TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer); +store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, () -> null, consumedCallback, time, null); Review Comment: See https://github.com/apache/kafka/pull/12781#discussion_r1006616025 for a solution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12789: KAFKA-13989: Errors while evaluating connector type should return UNKNOWN
C0urante commented on code in PR #12789: URL: https://github.com/apache/kafka/pull/12789#discussion_r1006589610 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -700,6 +707,9 @@ public ConnectorType connectorTypeForClass(String connClass) { * @return the {@link ConnectorType} of the connector */ public ConnectorType connectorTypeForConfig(Map connConfig) { Review Comment: If we remove `connectorTypeForClass`, we can probably just rename this to `connectorType`. Also, the Javadoc states that `connConfig` may not be null. Are there cases where we anticipate that it will now be null? If so, we should update the Javadoc accordingly; if not, we should remove the null check. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -691,7 +691,14 @@ protected Connector getConnector(String connType) { * @param connClass class of the connector */ public ConnectorType connectorTypeForClass(String connClass) { Review Comment: Do we even need this separate method? It looks like it's only ever called in tests. Can we inline this logic into `connectorTypeForConfig` and tweak our tests to call that method instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12789: KAFKA-13989: Errors while evaluating connector type should return UNKNOWN
C0urante commented on code in PR #12789: URL: https://github.com/apache/kafka/pull/12789#discussion_r1006588134 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -691,7 +691,14 @@ protected Connector getConnector(String connType) { * @param connClass class of the connector */ public ConnectorType connectorTypeForClass(String connClass) { Review Comment: Do we even need this separate method anymore? It looks like it's only ever called in tests. Can we inline this logic into `connectorTypeForConfig` and tweak our tests to call that method instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006616339 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -135,8 +161,7 @@ public class KafkaBasedLogTest { @SuppressWarnings("unchecked") @Before public void setUp() { -store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, -TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer); +store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, () -> null, consumedCallback, time, null); Review Comment: So I have resolved this in the `Fix expectStart and expectStop along with initializer` commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on PR #12781: URL: https://github.com/apache/kafka/pull/12781#issuecomment-1293252697 @C0urante @gharris1727 So now that I understand what the `expectStart` and `expectStop` was meant to do, I have both fixed this issue as well as the `initializer` being of the wrong type problem in the `Fix expectStart and expectStop along with initializer` commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
mdedetrich commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1006635305 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -115,10 +115,32 @@ public class KafkaBasedLogTest { private static final String TP1_VALUE_NEW = "VAL1_NEW"; private Time time = new MockTime(); -private KafkaBasedLog store; + +private class MockedKafkaBasedLog extends KafkaBasedLog { +public MockedKafkaBasedLog(String topic, + Map producerConfigs, + Map consumerConfigs, + Supplier topicAdminSupplier, + Callback> consumedCallback, + Time time, + Consumer initializer) { +super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, consumedCallback, time, initializer); +} + +@Override +protected KafkaProducer createProducer() { +return producer; +} + +@Override +protected MockConsumer createConsumer() { +return consumer; +} +} +private MockedKafkaBasedLog store; @Mock -private Runnable initializer; +private Consumer initializer; Review Comment: As pointed out by @gharris1727 , the type of `initializer` had to be changed because it was in fact incorrect. I have no idea how this happened to work beforehand with EasyMock, but this appears to work alongside with changing `expectStart` to `verify(initializer).accept(any());` (since we have a `Supplier` and not a `Runnable`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12777: Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest
divijvaidya commented on code in PR #12777: URL: https://github.com/apache/kafka/pull/12777#discussion_r1006648492 ## streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java: ## @@ -1166,58 +1164,43 @@ public void shouldNotThrowInvalidBackwardRangeExceptionWithNegativeFromKey() { @Test public void shouldCloseCacheAndWrappedStoreAfterErrorDuringCacheFlush() { setUpCloseTests(); -EasyMock.reset(cache); -cache.flush(CACHE_NAMESPACE); -EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on flush")); -cache.close(CACHE_NAMESPACE); -EasyMock.replay(cache); -EasyMock.reset(underlyingStore); -underlyingStore.close(); -EasyMock.replay(underlyingStore); +doThrow(new RuntimeException( +"Simulating an error on flush")) +.when(cache).flush(CACHE_NAMESPACE); +reset(underlyingStore); assertThrows(RuntimeException.class, cachingStore::close); -EasyMock.verify(cache, underlyingStore); +verifyAndTearDownCloseTests(); } @Test public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() { setUpCloseTests(); -EasyMock.reset(cache); -cache.flush(CACHE_NAMESPACE); -cache.close(CACHE_NAMESPACE); -EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close")); -EasyMock.replay(cache); -EasyMock.reset(underlyingStore); -underlyingStore.close(); -EasyMock.replay(underlyingStore); +doThrow(new RuntimeException("Simulating an error on close")) +.when(cache).close(CACHE_NAMESPACE); +reset(underlyingStore); Review Comment: do we need this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625003#comment-17625003 ] Luke Chen commented on KAFKA-14016: --- [~ableegoldman] , thanks for the comment. Yes, we have the logic in sticky assignor to protect multiple consumer claiming the same partition in the same highest generation. The original thought is that the same logic didn't exist in custom assignor. But after [KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614] get implemented and merged (which should happen in next release v3.4.0), we should not worry about it anymore. Therefore, for this issue, I'd also vote for just reverting the changes. Thanks. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #12791: KAFKA-14338: Use MockTime in RetryUtilTest to eliminate flakiness
C0urante commented on code in PR #12791: URL: https://github.com/apache/kafka/pull/12791#discussion_r100618 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java: ## @@ -50,16 +52,15 @@ public void setUp() throws Exception { @Test public void testSuccess() throws Exception { Mockito.when(mockCallable.call()).thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1, mockTime)); Mockito.verify(mockCallable, Mockito.times(1)).call(); } -// timeout the test after 1000ms if unable to complete within a reasonable time frame -@Test(timeout = 1000) +@Test public void testExhaustingRetries() throws Exception { Mockito.when(mockCallable.call()).thenThrow(new TimeoutException()); ConnectException e = assertThrows(ConnectException.class, Review Comment: Nit: while we're in the neighborhood, can we remove this unnecessary variable? Can just change the line to: ```java assertThrows(ConnectException.class, ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java: ## @@ -116,16 +117,20 @@ public void testNoBackoffTimeAndSucceed() throws Exception { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 0, mockTime)); Mockito.verify(mockCallable, Mockito.times(4)).call(); } @Test public void testNoBackoffTimeAndFail() throws Exception { -Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception")); +Mockito.when(mockCallable.call()).thenAnswer(invocation -> { +// Without any backoff time, the speed of the operation itself limits the number of retries and retry rate. +mockTime.sleep(30); +throw new TimeoutException("timeout exception"); +}); Review Comment: We can use the auto-tick feature of the `MockTime` class instead of adding in fake calls to `sleep` from our mock object. ```suggestion // Without backoff time, we never sleep; auto-tick helps move things along mockTime = new MockTime(10); Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception")); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12725: KAFKA-14132; Replace EasyMock with Mockito ConnectorsResourceTest
mdedetrich commented on code in PR #12725: URL: https://github.com/apache/kafka/pull/12725#discussion_r1006723063 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ## @@ -153,19 +157,23 @@ public class ConnectorsResourceTest { @Mock private WorkerConfig workerConfig; +private MockedStatic restClientStatic; + @Before public void setUp() throws NoSuchMethodException { -PowerMock.mockStatic(RestClient.class, -RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class, Object.class, TypeReference.class, WorkerConfig.class)); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); -PowerMock.replay(workerConfig); +restClientStatic = mockStatic(RestClient.class); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); connectorsResource = new ConnectorsResource(herder, workerConfig); -forward = EasyMock.mock(UriInfo.class); +forward = mock(UriInfo.class); MultivaluedMap queryParams = new MultivaluedHashMap<>(); queryParams.putSingle("forward", "true"); - EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes(); -EasyMock.replay(forward); +when(forward.getQueryParameters()).thenReturn(queryParams); +} + +@After +public void teardown() { +restClientStatic.close(); Review Comment: Found the problem and solved in the `Add verifyNoMoreInteractions to tearDown` commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush
vamossagar12 commented on code in PR #12784: URL: https://github.com/apache/kafka/pull/12784#discussion_r1006758803 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,16 +280,18 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +Throwable error = flushError.get(); +if (error == null) { Review Comment: Can this be the else block of https://github.com/apache/kafka/pull/12784/files#diff-6073f350e3c1c4ca7334407b32d162012543dad6bcc048cc970f3d11e852e5f2L293? Seems like `transactionOpen = false;` is anyways being set in this case. WDYT? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,16 +280,18 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +Throwable error = flushError.get(); +if (error == null) { +try { +producer.commitTransaction(); +} catch (Throwable t) { +log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); +flushError.compareAndSet(null, t); +} } - transactionOpen = false; -Throwable error = flushError.get(); +error = flushError.get(); Review Comment: This call is redundant? `error` is already set to `flushError.get()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush
vamossagar12 commented on PR #12784: URL: https://github.com/apache/kafka/pull/12784#issuecomment-1293401822 Thanks @sudeshwasnik . This is a nice catch. I have requested couple of cosmetic changes. And I agree with Chris, might need to increase the test coverage. Currently the only test part of this change is removing an expected Call on `FAIL_FLUSH_CALLBACK`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sudeshwasnik commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush
sudeshwasnik commented on code in PR #12784: URL: https://github.com/apache/kafka/pull/12784#discussion_r1006763859 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,16 +280,18 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +Throwable error = flushError.get(); +if (error == null) { +try { +producer.commitTransaction(); +} catch (Throwable t) { +log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); +flushError.compareAndSet(null, t); +} } - transactionOpen = false; -Throwable error = flushError.get(); +error = flushError.get(); Review Comment: So we want to check if producer.commitTransaction itself resulted in error, ref - line 289 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,16 +280,18 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +Throwable error = flushError.get(); +if (error == null) { +try { +producer.commitTransaction(); +} catch (Throwable t) { +log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); +flushError.compareAndSet(null, t); +} } - transactionOpen = false; -Throwable error = flushError.get(); +error = flushError.get(); Review Comment: we want to check if producer.commitTransaction itself resulted in error, ref - line 289 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush
vamossagar12 commented on code in PR #12784: URL: https://github.com/apache/kafka/pull/12784#discussion_r1006767287 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,16 +280,18 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +Throwable error = flushError.get(); +if (error == null) { +try { +producer.commitTransaction(); +} catch (Throwable t) { +log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); +flushError.compareAndSet(null, t); +} } - transactionOpen = false; -Throwable error = flushError.get(); +error = flushError.get(); Review Comment: Oh ok. Now I see that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sudeshwasnik commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush
sudeshwasnik commented on code in PR #12784: URL: https://github.com/apache/kafka/pull/12784#discussion_r1006768071 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,16 +280,18 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +Throwable error = flushError.get(); +if (error == null) { Review Comment: want to check for https://github.com/apache/kafka/pull/12784/files#diff-6073f350e3c1c4ca7334407b32d162012543dad6bcc048cc970f3d11e852e5f2R274 first here. If this was null, next error could be result of producer.commitTransaction itself, so we need to check for that error again. Error resulting from any of those should throw exception... which is what i'm intending to do. lmk if i am missing your suggesting 😅 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,16 +280,18 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +Throwable error = flushError.get(); +if (error == null) { Review Comment: want to check for https://github.com/apache/kafka/pull/12784/files#diff-6073f350e3c1c4ca7334407b32d162012543dad6bcc048cc970f3d11e852e5f2R274 first here. If this was null, next error could be result of producer.commitTransaction itself, so we need to check for that error again. Error resulting from any of those should throw exception... which is what i'm intending to do. lmk if i am missing your suggestion 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sudeshwasnik commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush
sudeshwasnik commented on code in PR #12784: URL: https://github.com/apache/kafka/pull/12784#discussion_r1006768071 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,16 +280,18 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +Throwable error = flushError.get(); +if (error == null) { Review Comment: want to check for https://github.com/apache/kafka/pull/12784/files#diff-6073f350e3c1c4ca7334407b32d162012543dad6bcc048cc970f3d11e852e5f2R274 first here. If this was null, next error could be result of producer.commitTransaction itself, so we need to check for that error again. Error resulting from any of those should throw exception... that's the intention lmk if i am missing your suggestion 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #12671: KAFKA-14250: MirrorSourceTask exception causes the task to fail
viktorsomogyi commented on PR #12671: URL: https://github.com/apache/kafka/pull/12671#issuecomment-1293420143 @C0urante in fact this was as escalation that our customer brought to us, they were using a dedicated MM2 cluster. In the meantime during another case with the customer we figured out that [KAFKA-9851](https://issues.apache.org/jira/browse/KAFKA-9851) could have been the real cause and the fact that MM2 prints an error message is just red herring. Regardless, it would be good to improve the error messages to include that the kind of failure experienced here won't persist between rebalances. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush
vamossagar12 commented on code in PR #12784: URL: https://github.com/apache/kafka/pull/12784#discussion_r1006791813 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -280,16 +280,18 @@ private void commitTransaction() { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd -try { -producer.commitTransaction(); -} catch (Throwable t) { -log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); -flushError.compareAndSet(null, t); +Throwable error = flushError.get(); +if (error == null) { Review Comment: Yeah it makes sense now. I missed the part on line #289. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12771: KAFKA-14299: Handle TaskCorruptedException during initialization
ableegoldman commented on code in PR #12771: URL: https://github.com/apache/kafka/pull/12771#discussion_r1006792420 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -810,10 +810,19 @@ private void transitRestoredTaskToRunning(final Task task, } private void addTasksToStateUpdater() { +final Map taskExceptions = new LinkedHashMap<>(); for (final Task task : tasks.drainPendingTaskToInit()) { -task.initializeIfNeeded(); -stateUpdater.add(task); +try { +task.initializeIfNeeded(); +stateUpdater.add(task); +} catch (final RuntimeException e) { +// need to add task back to the bookkeeping to be handled by the stream thread +tasks.addTask(task); Review Comment: FWIW w.r.t the idea of attaching the entire task instead of just its taskId, I agree with Lucas here -- seems risky, would likely end up being abused as things progress (by us not the users), but most importantly imo is that it's just bad practice. I'm not too well versed on anti patterns I'll admit (perks & downsides of being a physics major 😉), but I would think this counts as one (or should) -- we shouldn't use exceptions to pass around actual objects rather than simple info/metadata. Especially objects of uncertain status, questionable thread safety, and significant resources attached. Just my two cents as someone who's coming into the restoration thread game very late -- given this fact I'm not particularly qualified to weigh in on what the _right_ thing to do is here, though again I feel like Lucas's suggestion to maintain a/the `TaskRegistry` as a central, source-of-truth type thing for tracking and organizing all the tasks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12773: KAFKA-14299: Return emptied ChangelogReader to ACTIVE_RESTORING
ableegoldman commented on code in PR #12773: URL: https://github.com/apache/kafka/pull/12773#discussion_r1006804068 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -990,6 +991,10 @@ public void unregister(final Collection revokedChangelogs) { } removeChangelogsFromRestoreConsumer(revokedInitializedChangelogs); + +if (changelogs.isEmpty()) { +state = ChangelogReaderState.ACTIVE_RESTORING; Review Comment: Apologies if I'm missing something or my understanding of the restoration-related code is just completely out of date now, but do I understand correctly that this was/is a bug in the current (soon-to-be-old) architecture, and not related to the restoration thread work? Also, just curious, did you find this bug due to you or someone else hitting it, or was it just something you happened to notice wasn't quite right? If you/someone is actually running into this "in the wild" it may be a good idea to file a jira ticket for it, just to document that the bug exists, we're aware of it, and that it's fixed in whichever versions. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -1070,6 +1071,11 @@ public void shouldTransitState() { assertEquals(ACTIVE_RESTORING, changelogReader.state()); assertEquals(mkSet(tp, tp1, tp2), consumer.assignment()); assertEquals(mkSet(tp1, tp2), consumer.paused()); + Review Comment: Wow, there is a LOT going on in this test...encroaching on 180 lines, yikes. Could you split this out into its own test to make it more clear what is being tested/that we have coverage for this edge case -- it's hard to get a grasp on whether we're actually testing everything when it's all crammed into a single test. I know you didn't write this monolith to begin with, but we should strive always to improve on the sins of our ancestors 😉 More seriously though, covering multiple things in one test can be useful eg with integration tests where the setup & teardown alone take a significant chunk of time. But I would imagine/hope a unit test like this runs fairly quickly so it probably makes the most sense to optimize for comprehension Sorry for the slightly pedantic speech, if it's any consolation I bet Bruno would have said more or less the same thing lol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1006823247 ## core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala: ## @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.server + +import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig} +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.Admin +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.ApiKeys +import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.Properties + +class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { + val numNodes = 2 + val numParts = 1 + val initialMessages = 100 + val nMessages = 100 + + val topic = "test-fetch-from-follower" + val leaderBrokerId = 0 + val followerBrokerId = 1 + var admin: Admin = null + + def overridingProps: Properties = { +val props = new Properties +props.put(KafkaConfig.NumPartitionsProp, numParts.toString) + +props + } + + override def generateConfigs: collection.Seq[KafkaConfig] = +TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false, enableFetchFromFollower = true) + .map(KafkaConfig.fromProps(_, overridingProps)) + + @BeforeEach + def initializeFetchFromFollowerCluster(): Unit = { +// Create a 2 broker cluster where broker 0 is the leader and 1 is the follower. + +admin = TestUtils.createAdminClient(brokers, listenerName) +TestUtils.createTopicWithAdminRaw( + admin, + topic, + replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) +) +TestUtils.generateAndProduceMessages(brokers, topic, initialMessages) + } + + @AfterEach + def close(): Unit = { +if (admin != null) { + admin.close() +} + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + @Timeout(30) + def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit = { +TestUtils.generateAndProduceMessages(brokers, topic, nMessages) +// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout (30 seconds) to ensure that the +// test only passes when the delayed fetch purgatory is completed after successfully replicating from the leader. + +val totalMessages = initialMessages + nMessages +val topicPartition = new TopicPartition(topic, 0) +val offsetMap = Map[TopicPartition, Long]( + topicPartition -> (totalMessages - 1) Review Comment: I suppose that the request stays in the purgatory when you fetch offset 200 because there is not record at offset 200 yet and your don't produce anything after the fetch request is sent out. I think that you test does not really test what you want here. In my mind, you should do the following: * create the topic with no records. * fetch at offset 0 with a high max wait. the request should be placed in the purgatory. * produce one record. * fetch request should be completed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14339) producer commits-transaction even if offsetWriter faces serializationError
Sudesh Wasnik created KAFKA-14339: - Summary: producer commits-transaction even if offsetWriter faces serializationError Key: KAFKA-14339 URL: https://issues.apache.org/jira/browse/KAFKA-14339 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sudesh Wasnik In ExactlyOnceWorkerSourceTask, producer.commitTransaction is performed even if offsetWriter faces a serialization error. This leads to no offsets being sent to the producer, but still trying to commit the transaction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1006833685 ## core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala: ## @@ -1100,6 +1100,66 @@ class ReplicaFetcherThreadTest { assertEquals(Collections.singletonList(tid1p0), fetchRequestBuilder2.removed()) } + @Test + def testFetchResponseWithPartitionData(): Unit = { +val appendInfo: LogAppendInfo = mock(classOf[LogAppendInfo]) +verifyLocalFetchCompletionAfterReplication(Some(appendInfo)) + } + + @Test + def testFetchResponseWithNoPartitionData(): Unit = { +verifyLocalFetchCompletionAfterReplication(appendInfo = None) + } + + private def verifyLocalFetchCompletionAfterReplication(appendInfo: Option[LogAppendInfo]): Unit = { +val props = TestUtils.createBrokerConfig(1, "localhost:1234") +val config = KafkaConfig.fromProps(props) + +val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend]) +when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint) + +val log: UnifiedLog = mock(classOf[UnifiedLog]) + +val partition: Partition = mock(classOf[Partition]) +when(partition.localLogOrException).thenReturn(log) +when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], any[Boolean])).thenReturn(appendInfo) + +val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition) +val brokerTopicStats = new BrokerTopicStats +when(replicaManager.brokerTopicStats).thenReturn(brokerTopicStats) + +val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota]) + +val thread = createReplicaFetcherThread( + name = "replica-fetcher", + fetcherId = 0, + brokerConfig = config, + failedPartitions = failedPartitions, + replicaMgr = replicaManager, + quota = replicaQuota, + leaderEndpointBlockingSend = mockBlockingSend) + +val tp0 = new TopicPartition("testTopic", 0) +val tp1 = new TopicPartition("testTopic", 1) +val records = MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))) +val partitionData: thread.FetchData = new FetchResponseData.PartitionData() + .setRecords(records) + +thread.processPartitionData(tp0, 0, partitionData.setPartitionIndex(0)) +thread.processPartitionData(tp1, 0, partitionData.setPartitionIndex(1)) +thread.doWork() +appendInfo match { + case Some(_) => +val argument: ArgumentCaptor[Seq[TopicPartition]] = ArgumentCaptor.forClass(classOf[Seq[TopicPartition]]) +verify(replicaManager, times(1)).completeDelayedFetchRequests(argument.capture()) Review Comment: I am a bit surprised by this. Isn't it possible to do something like `verify(replicaManager, times(1)).completeDelayedFetchRequests(Seq())`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1006834496 ## core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala: ## @@ -1100,6 +1100,66 @@ class ReplicaFetcherThreadTest { assertEquals(Collections.singletonList(tid1p0), fetchRequestBuilder2.removed()) } + @Test + def testFetchResponseWithPartitionData(): Unit = { +val appendInfo: LogAppendInfo = mock(classOf[LogAppendInfo]) +verifyLocalFetchCompletionAfterReplication(Some(appendInfo)) + } + + @Test + def testFetchResponseWithNoPartitionData(): Unit = { +verifyLocalFetchCompletionAfterReplication(appendInfo = None) + } + + private def verifyLocalFetchCompletionAfterReplication(appendInfo: Option[LogAppendInfo]): Unit = { +val props = TestUtils.createBrokerConfig(1, "localhost:1234") +val config = KafkaConfig.fromProps(props) + +val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend]) +when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint) + +val log: UnifiedLog = mock(classOf[UnifiedLog]) + +val partition: Partition = mock(classOf[Partition]) +when(partition.localLogOrException).thenReturn(log) +when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], any[Boolean])).thenReturn(appendInfo) + +val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition) +val brokerTopicStats = new BrokerTopicStats +when(replicaManager.brokerTopicStats).thenReturn(brokerTopicStats) + +val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota]) + +val thread = createReplicaFetcherThread( + name = "replica-fetcher", + fetcherId = 0, + brokerConfig = config, + failedPartitions = failedPartitions, + replicaMgr = replicaManager, + quota = replicaQuota, + leaderEndpointBlockingSend = mockBlockingSend) + +val tp0 = new TopicPartition("testTopic", 0) +val tp1 = new TopicPartition("testTopic", 1) +val records = MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))) +val partitionData: thread.FetchData = new FetchResponseData.PartitionData() + .setRecords(records) + +thread.processPartitionData(tp0, 0, partitionData.setPartitionIndex(0)) +thread.processPartitionData(tp1, 0, partitionData.setPartitionIndex(1)) +thread.doWork() +appendInfo match { + case Some(_) => +val argument: ArgumentCaptor[Seq[TopicPartition]] = ArgumentCaptor.forClass(classOf[Seq[TopicPartition]]) +verify(replicaManager, times(1)).completeDelayedFetchRequests(argument.capture()) +assertEquals(Seq(tp0, tp1), argument.getValue) + case None => +verify(replicaManager, times(0)).completeDelayedFetchRequests(any[Seq[TopicPartition]]) Review Comment: nit: Could we verify that the set is actually empty here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1006836805 ## core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala: ## @@ -1100,6 +1100,66 @@ class ReplicaFetcherThreadTest { assertEquals(Collections.singletonList(tid1p0), fetchRequestBuilder2.removed()) } + @Test + def testFetchResponseWithPartitionData(): Unit = { +val appendInfo: LogAppendInfo = mock(classOf[LogAppendInfo]) +verifyLocalFetchCompletionAfterReplication(Some(appendInfo)) + } + + @Test + def testFetchResponseWithNoPartitionData(): Unit = { +verifyLocalFetchCompletionAfterReplication(appendInfo = None) + } + + private def verifyLocalFetchCompletionAfterReplication(appendInfo: Option[LogAppendInfo]): Unit = { +val props = TestUtils.createBrokerConfig(1, "localhost:1234") +val config = KafkaConfig.fromProps(props) + +val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend]) +when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint) + +val log: UnifiedLog = mock(classOf[UnifiedLog]) + +val partition: Partition = mock(classOf[Partition]) +when(partition.localLogOrException).thenReturn(log) +when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], any[Boolean])).thenReturn(appendInfo) + +val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition) +val brokerTopicStats = new BrokerTopicStats +when(replicaManager.brokerTopicStats).thenReturn(brokerTopicStats) + +val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota]) + +val thread = createReplicaFetcherThread( + name = "replica-fetcher", + fetcherId = 0, + brokerConfig = config, + failedPartitions = failedPartitions, + replicaMgr = replicaManager, + quota = replicaQuota, + leaderEndpointBlockingSend = mockBlockingSend) + +val tp0 = new TopicPartition("testTopic", 0) +val tp1 = new TopicPartition("testTopic", 1) +val records = MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))) +val partitionData: thread.FetchData = new FetchResponseData.PartitionData() + .setRecords(records) Review Comment: nit: In the case where `appendInfo` is `None`, it is a tad awkward that we set records here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12772: KAFKA-14299: Avoid busy polling in state updater
ableegoldman commented on code in PR #12772: URL: https://github.com/apache/kafka/pull/12772#discussion_r1006838904 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -353,19 +353,21 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(getRestoreConsumerClientId(threadId)); final Consumer restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); +final boolean stateUpdaterEnabled = +InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false); + final StoreChangelogReader changelogReader = new StoreChangelogReader( time, config, Review Comment: nit: we already pass the actual configs into the changelog reader, why not set the `stateUpdaterEnabled` flag inside the constructor/StoreChangelogReader class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1006845821 ## core/src/main/scala/kafka/server/ReplicaFetcherThread.scala: ## @@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String, brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) +logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition } Review Comment: I wonder if we could slightly better than this. If you look on the produce path, we have the following code: ``` result.info.leaderHwChange match { case LeaderHwChange.Increased => // some delayed operations may be unblocked after HW changed delayedProducePurgatory.checkAndComplete(requestKey) delayedFetchPurgatory.checkAndComplete(requestKey) delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) case LeaderHwChange.Same => // probably unblock some follower fetch requests since log end offset has been updated delayedFetchPurgatory.checkAndComplete(requestKey) case LeaderHwChange.None => // nothing } ``` I wonder if we should do something similar here. Intuitively, we are interested by updated HWM and not so by new records, right? What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on PR #12783: URL: https://github.com/apache/kafka/pull/12783#issuecomment-1293497553 @jeffkbkim Thanks for the update. I left a few more comments/suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #12674: KAFKA-14255: Fetching from follower should be disallowed if fetch from follower is disabled
dajac closed pull request #12674: KAFKA-14255: Fetching from follower should be disallowed if fetch from follower is disabled URL: https://github.com/apache/kafka/pull/12674 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12790: KAFKA-14337: correctly remove topicsWithCollisionChars after topic deletion
dengziming commented on code in PR #12790: URL: https://github.com/apache/kafka/pull/12790#discussion_r1006976348 ## core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala: ## @@ -470,6 +471,30 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi TestUtils.verifyTopicDeletion(zkClientOrNull, testTopicName, 1, brokers) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTopicWithCollidingCharDeletionAndCreateAgain(quorum: String): Unit = { +// create the topic with colliding chars +val topicWithCollidingChar = "test.a" +val createOpts = new TopicCommandOptions(Array("--partitions", "1", + "--replication-factor", "1", + "--topic", topicWithCollidingChar)) +createAndWaitTopic(createOpts) + +// delete the topic +val deleteOpts = new TopicCommandOptions(Array("--topic", topicWithCollidingChar)) + +if (!isKRaftTest()) { + val deletePath = DeleteTopicsTopicZNode.path(topicWithCollidingChar) + assertFalse(zkClient.pathExists(deletePath), "Delete path for topic shouldn't exist before deletion.") +} +topicService.deleteTopic(deleteOpts) +TestUtils.verifyTopicDeletion(zkClientOrNull, topicWithCollidingChar, 1, brokers) + +val createTopic: Executable = () => createAndWaitTopic(createOpts) +assertDoesNotThrow(createTopic) Review Comment: I'm investigating why there will be compile error when directly using `assertDoesNotThrow(() => createAndWaitTopic(createOpts))`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector
C0urante commented on PR #12366: URL: https://github.com/apache/kafka/pull/12366#issuecomment-1293697763 Thanks @OmniaGM, good idea. I've updated the README and added an integration test that verifies that MM2 can still run with exactly-once support enabled. I should note that the `testReplication` test case is currently failing locally with timeout issues, but succeeds when I bump the timeout. Going to see how the Jenkins build goes; we may choose to increase the timeout in these tests if they also fail during CI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #12734: KAFKA-14255; Return an empty record instead of an OffsetOutOfRangeException when fetching from a follower without a leader epoch
dajac closed pull request #12734: KAFKA-14255; Return an empty record instead of an OffsetOutOfRangeException when fetching from a follower without a leader epoch URL: https://github.com/apache/kafka/pull/12734 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers
mimaison commented on code in PR #12544: URL: https://github.com/apache/kafka/pull/12544#discussion_r1007088373 ## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java: ## @@ -106,6 +108,7 @@ public Connect startConnect(Map workerProps) { // Create the admin client to be shared by all backing stores. Map adminProps = new HashMap<>(config.originals()); ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId); +adminProps.putIfAbsent(CLIENT_ID_CONFIG, "connect-cluster-" + config.groupId()); Review Comment: Yeah the advertised URL is the only identifier we have for a unique worker but it could be "ugly" and pretty long. Ideally users would provide a unique identifier (like the pod name) as the client.id and we would prefix/suffix it with the groupId and a short name like `shared-admin`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625187#comment-17625187 ] Shawn Wang commented on KAFKA-14016: [~ableegoldman] Yes, we experienced KAFKA-13891 Here is the full history of our case: We have several consumer group which has more than 2000 consumers. And we are using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled StaticMembership && Coopearative Rebalance to avoid STW time in rebalance. # we found that in some cases the partition will be duplicated assigned, so we patched KAFKA-12984 , KAFKA-12983, KAFKA-13406 # after we deployed online, we found that some consumer group will rebalance for a long time ( 2 hours) before it finally get Stable, so we then patched KAFKA-13891. # after deployed, we experienced more partition lag when rebalance happens. Then i created this issue and try to get some advise. # Actually we workaround it by 'ignore the generation value when leader calculate assignment' ( just set every memeber's generation to unknonw ). And after we go online for more than 2 months, it looks good now. For "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner", i think in the code does't implement in this way [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127] Please kindly correct me if i'm wrong. In this code we clear all previous owned parittions if we got a higer geneartion, so only the ownedPartitions with highest generation will be valid I think "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner" is also a fix for this. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625187#comment-17625187 ] Shawn Wang edited comment on KAFKA-14016 at 10/27/22 4:08 PM: -- [~ableegoldman] Yes, we experienced KAFKA-13891 Here is the full history of our case: We have several consumer group which has more than 2000 consumers. And we are using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled the build-in StaticMembership && Coopearative Rebalance to avoid STW time in rebalance. # we found that in some cases the partition will be duplicated assigned, so we patched KAFKA-12984 , KAFKA-12983, KAFKA-13406 # after we deployed online, we found that some consumer group will rebalance for a long time ( 2 hours) before it finally get Stable, so we then patched KAFKA-13891. # after deployed, we experienced more partition lag when rebalance happens. Then i created this issue and try to get some advise. # Actually we workaround it by 'ignore the generation value when leader calculate assignment' ( just set every memeber's generation to unknonw ). And after we go online for more than 2 months, it looks good now. For "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner", i think in the code does't implement in this way [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127] Please kindly correct me if i'm wrong. In this code we clear all previous owned parittions if we got a higer geneartion, so only the ownedPartitions with highest generation will be valid I think "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner" is also a fix for this. was (Author: JIRAUSER289108): [~ableegoldman] Yes, we experienced KAFKA-13891 Here is the full history of our case: We have several consumer group which has more than 2000 consumers. And we are using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled StaticMembership && Coopearative Rebalance to avoid STW time in rebalance. # we found that in some cases the partition will be duplicated assigned, so we patched KAFKA-12984 , KAFKA-12983, KAFKA-13406 # after we deployed online, we found that some consumer group will rebalance for a long time ( 2 hours) before it finally get Stable, so we then patched KAFKA-13891. # after deployed, we experienced more partition lag when rebalance happens. Then i created this issue and try to get some advise. # Actually we workaround it by 'ignore the generation value when leader calculate assignment' ( just set every memeber's generation to unknonw ). And after we go online for more than 2 months, it looks good now. For "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner", i think in the code does't implement in this way [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127] Please kindly correct me if i'm wrong. In this code we clear all previous owned parittions if we got a higer geneartion, so only the ownedPartitions with highest generation will be valid I think "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner" is also a fix for this. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > #
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625192#comment-17625192 ] Shawn Wang commented on KAFKA-14016: I also vote for revertingKAFKA-13891 since that previous state in 3 happens randomly and doesn't have a big impact. But i don't think [KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614] can solve this problem. It moved generationId from userData to ConsumerProtocolSubscription and don't change the logic of build-in CooperativeStickyAssignor. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625192#comment-17625192 ] Shawn Wang edited comment on KAFKA-14016 at 10/27/22 4:13 PM: -- I also vote for reverting KAFKA-13891 since the case happens randomly and doesn't have a big impact. But i don't think [KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614] can solve this problem. It moved generationId from userData to ConsumerProtocolSubscription and don't change the logic of build-in CooperativeStickyAssignor. was (Author: JIRAUSER289108): I also vote for revertingKAFKA-13891 since that previous state in 3 happens randomly and doesn't have a big impact. But i don't think [KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614] can solve this problem. It moved generationId from userData to ConsumerProtocolSubscription and don't change the logic of build-in CooperativeStickyAssignor. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625192#comment-17625192 ] Shawn Wang edited comment on KAFKA-14016 at 10/27/22 4:13 PM: -- I also vote for reverting KAFKA-13891 since the case happens randomly and doesn't have a big impact. But i don't think [KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614] can solve this problem. It moved generationId from userData to ConsumerProtocolSubscription and didn't change the logic of build-in CooperativeStickyAssignor. was (Author: JIRAUSER289108): I also vote for reverting KAFKA-13891 since the case happens randomly and doesn't have a big impact. But i don't think [KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614] can solve this problem. It moved generationId from userData to ConsumerProtocolSubscription and don't change the logic of build-in CooperativeStickyAssignor. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rhauch merged pull request #12621: MINOR: Migrate connect system tests to KRaft
rhauch merged PR #12621: URL: https://github.com/apache/kafka/pull/12621 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625187#comment-17625187 ] Shawn Wang edited comment on KAFKA-14016 at 10/27/22 4:20 PM: -- [~ableegoldman] Yes, we experienced KAFKA-13891 Here is the full history of our case: We have several consumer group which has more than 2000 consumers. And we are using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled the build-in StaticMembership && Coopearative Rebalance to avoid STW time in rebalance. # we found that in some cases the partition will be duplicated assigned, so we patched KAFKA-12984 , KAFKA-12983, KAFKA-13406 # after we deployed online, we found that some consumer group will rebalance for a long time ( 2 hours) before it finally get Stable, so we then patched KAFKA-13891. # after deployed, we experienced more partition lag when rebalance happens. Then i created this issue and try to get some advise. # We finally workaround it by 'ignore the generation value when leader calculate assignment' ( just set every memeber's generation to unknonw ) && revert KAFKA-13891. And after we go online for more than 2 months, it looks good now. For "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner", i think in the code does't implement in this way [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127] Please kindly correct me if i'm wrong. In this code we clear all previous owned parittions if we got a higer geneartion, so only the ownedPartitions with highest generation will be valid I think "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner" is also a fix for this. was (Author: JIRAUSER289108): [~ableegoldman] Yes, we experienced KAFKA-13891 Here is the full history of our case: We have several consumer group which has more than 2000 consumers. And we are using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled the build-in StaticMembership && Coopearative Rebalance to avoid STW time in rebalance. # we found that in some cases the partition will be duplicated assigned, so we patched KAFKA-12984 , KAFKA-12983, KAFKA-13406 # after we deployed online, we found that some consumer group will rebalance for a long time ( 2 hours) before it finally get Stable, so we then patched KAFKA-13891. # after deployed, we experienced more partition lag when rebalance happens. Then i created this issue and try to get some advise. # Actually we workaround it by 'ignore the generation value when leader calculate assignment' ( just set every memeber's generation to unknonw ). And after we go online for more than 2 months, it looks good now. For "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner", i think in the code does't implement in this way [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127] Please kindly correct me if i'm wrong. In this code we clear all previous owned parittions if we got a higer geneartion, so only the ownedPartitions with highest generation will be valid I think "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner" is also a fix for this. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their part
[GitHub] [kafka] rhauch commented on pull request #12621: MINOR: Migrate connect system tests to KRaft
rhauch commented on PR #12621: URL: https://github.com/apache/kafka/pull/12621#issuecomment-1293774760 Merged to the `trunk` branch and cherry-picked to the `3.3` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
mimaison commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007096564 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java: ## @@ -0,0 +1,117 @@ +package org.apache.kafka.connect.mirror.clients.admin; Review Comment: This is missing the license ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java: ## @@ -0,0 +1,15 @@ +package org.apache.kafka.connect.mirror.clients.admin; Review Comment: This is missing the license ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,596 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAd
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007133812 ## core/src/main/scala/kafka/server/ReplicaFetcherThread.scala: ## @@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String, brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) +logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition } Review Comment: Note that the HWM is updated separately here: `updateHighWatermarkAndStartOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #12769: KAFKA-14314: Add check for null upstreamTopic
mimaison commented on code in PR #12769: URL: https://github.com/apache/kafka/pull/12769#discussion_r1007143335 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ## @@ -319,4 +320,17 @@ public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception { connector.refreshTopicPartitions(); verify(connector, times(1)).computeAndCreateTopicPartitions(); } + +@Test +public void testIsCycleWithNullUpstreamTopic() throws Exception { Review Comment: I think we can drop `throws Exception` ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ## @@ -319,4 +320,17 @@ public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception { connector.refreshTopicPartitions(); verify(connector, times(1)).computeAndCreateTopicPartitions(); } + +@Test +public void testIsCycleWithNullUpstreamTopic() throws Exception { +class BadReplicationPolicy extends DefaultReplicationPolicy { Review Comment: What about `CustomReplicationPolicy` as it's not really a bad policy, `null` is allowed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007153544 ## core/src/main/scala/kafka/server/ReplicaFetcherThread.scala: ## @@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String, brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) +logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition } Review Comment: I am actually not sure about this. I need to think about it a little more. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #12792: [Test]gradle oom fix
philipnee commented on PR #12792: URL: https://github.com/apache/kafka/pull/12792#issuecomment-1293833011 [0c166c0](https://github.com/apache/kafka/pull/12792/commits/0c166c0cc456619b4168736820e05b06dc7ae7ab) - no gradle oom -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee closed pull request #12038: [WIP] KAFKA-13421
philipnee closed pull request #12038: [WIP] KAFKA-13421 URL: https://github.com/apache/kafka/pull/12038 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1007181238 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config, * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized. See the exception for more details + * transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for Review Comment: One could get Authorization issue upon requesting for a producerId, not necessarily caused by an unavailable broker. Yes, maybe remove the word "fatal" thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1007185990 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -155,7 +155,7 @@ private enum State { private boolean isTransitionValid(State source, State target) { switch (target) { case UNINITIALIZED: -return source == READY; +return source == READY || source == ABORTABLE_ERROR; Review Comment: It's one way (that I could think of) to propagate the authorization error from the sender loop; otherwise, the sender will continue to retry and causes timeout in some cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1007187187 ## clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java: ## @@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() { } } +@Test +public void testClusterAuthorizationFailure() throws Exception { +int maxBlockMs = 500; + +Map configs = new HashMap<>(); +configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); +configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); +configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn"); + +Time time = new MockTime(1); +MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); +ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE); + +MockClient client = new MockClient(time, metadata); +client.updateMetadata(initialUpdateResponse); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE)); +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED)); +Producer producer = kafkaProducer(configs, new StringSerializer(), +new StringSerializer(), metadata, client, null, time); +assertThrows(ClusterAuthorizationException.class, producer::initTransactions); + +// retry initTransactions after the ClusterAuthorizationException not being thrown +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); +TestUtils.retryOnExceptionWithTimeout(1000, 100, producer::initTransactions); Review Comment: yes! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12790: KAFKA-14337: correctly remove topicsWithCollisionChars after topic deletion
hachikuji commented on PR #12790: URL: https://github.com/apache/kafka/pull/12790#issuecomment-1293873487 Great find! Agree with @dengziming that we should have a test in `ReplicationControlManagerTest`. Otherwise, LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest
bbejeck commented on PR #12465: URL: https://github.com/apache/kafka/pull/12465#issuecomment-1293885171 Failures unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
jeffkbkim commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007204668 ## core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala: ## @@ -1100,6 +1100,66 @@ class ReplicaFetcherThreadTest { assertEquals(Collections.singletonList(tid1p0), fetchRequestBuilder2.removed()) } + @Test + def testFetchResponseWithPartitionData(): Unit = { +val appendInfo: LogAppendInfo = mock(classOf[LogAppendInfo]) +verifyLocalFetchCompletionAfterReplication(Some(appendInfo)) + } + + @Test + def testFetchResponseWithNoPartitionData(): Unit = { +verifyLocalFetchCompletionAfterReplication(appendInfo = None) + } + + private def verifyLocalFetchCompletionAfterReplication(appendInfo: Option[LogAppendInfo]): Unit = { +val props = TestUtils.createBrokerConfig(1, "localhost:1234") +val config = KafkaConfig.fromProps(props) + +val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend]) +when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint) + +val log: UnifiedLog = mock(classOf[UnifiedLog]) + +val partition: Partition = mock(classOf[Partition]) +when(partition.localLogOrException).thenReturn(log) +when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], any[Boolean])).thenReturn(appendInfo) + +val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition) +val brokerTopicStats = new BrokerTopicStats +when(replicaManager.brokerTopicStats).thenReturn(brokerTopicStats) + +val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota]) + +val thread = createReplicaFetcherThread( + name = "replica-fetcher", + fetcherId = 0, + brokerConfig = config, + failedPartitions = failedPartitions, + replicaMgr = replicaManager, + quota = replicaQuota, + leaderEndpointBlockingSend = mockBlockingSend) + +val tp0 = new TopicPartition("testTopic", 0) +val tp1 = new TopicPartition("testTopic", 1) +val records = MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))) +val partitionData: thread.FetchData = new FetchResponseData.PartitionData() + .setRecords(records) + +thread.processPartitionData(tp0, 0, partitionData.setPartitionIndex(0)) +thread.processPartitionData(tp1, 0, partitionData.setPartitionIndex(1)) +thread.doWork() +appendInfo match { + case Some(_) => +val argument: ArgumentCaptor[Seq[TopicPartition]] = ArgumentCaptor.forClass(classOf[Seq[TopicPartition]]) +verify(replicaManager, times(1)).completeDelayedFetchRequests(argument.capture()) +assertEquals(Seq(tp0, tp1), argument.getValue) + case None => +verify(replicaManager, times(0)).completeDelayedFetchRequests(any[Seq[TopicPartition]]) Review Comment: i'm not sure i follow. replicaManager.completeDelayedFetchRequests is not called when appendInfo is empty. i will add a check for both cases to assert that the underlying buffer is empty after thread.doWork -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1007207887 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -155,7 +155,7 @@ private enum State { private boolean isTransitionValid(State source, State target) { switch (target) { case UNINITIALIZED: -return source == READY; +return source == READY || source == ABORTABLE_ERROR; Review Comment: Hmmm. I see. I'm just wondering if we are hijacking the current state machine in an unexpected way and if there are implications there. I suppose we are only following this path on these specific error types, but I wonder if we are missing anything existing by changing the valid transitions and/or opening up the potential for something else in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest
bbejeck merged PR #12465: URL: https://github.com/apache/kafka/pull/12465 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest
bbejeck commented on PR #12465: URL: https://github.com/apache/kafka/pull/12465#issuecomment-1293889869 Merged #12465 into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest
bbejeck commented on PR #12465: URL: https://github.com/apache/kafka/pull/12465#issuecomment-1293890128 Thanks for the contribution @divijvaidya ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
jeffkbkim commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007215458 ## core/src/main/scala/kafka/server/ReplicaFetcherThread.scala: ## @@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String, brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) +logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition } Review Comment: i think this makes sense. consumer fetches will at most fetch up to the HWM so we can add the topic partition only when `LeaderHwChange.Increased` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12950) Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest
[ https://issues.apache.org/jira/browse/KAFKA-12950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625246#comment-17625246 ] Bill Bejeck commented on KAFKA-12950: - Merged into trunk > Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest > > > Key: KAFKA-12950 > URL: https://issues.apache.org/jira/browse/KAFKA-12950 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: Josep Prat >Assignee: Divij Vaidya >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14337) topic name with "." cannot be created after deletion
[ https://issues.apache.org/jira/browse/KAFKA-14337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14337: --- Fix Version/s: 3.4.0 3.3.2 > topic name with "." cannot be created after deletion > > > Key: KAFKA-14337 > URL: https://issues.apache.org/jira/browse/KAFKA-14337 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.1 >Reporter: thanhnd96 >Assignee: Luke Chen >Priority: Critical > Fix For: 3.4.0, 3.3.2 > > > Hi admin, > My issue is after i create topic like topic.AAA or Topic.AAA.01 then delete 1 > of the other 2 topics. > Then i can't create 1 of the 2 topics. > But i create topic test123 then delete and recreate fine. > This is log i tried to create topic.AAA > WARN [Controller 1] createTopics: failed with unknown server exception > NoSuchElementException at epoch 14 in 193 us. Renouncing leadership and > reverting to the last committed offset 28. > (org.apache.kafka.controller.QuorumController) > java.util.NoSuchElementException > at > org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:167) > at > org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:139) > at > org.apache.kafka.timeline.TimelineHashSet$ValueIterator.next(TimelineHashSet.java:120) > at > org.apache.kafka.controller.ReplicationControlManager.validateNewTopicNames(ReplicationControlManager.java:799) > at > org.apache.kafka.controller.ReplicationControlManager.createTopics(ReplicationControlManager.java:567) > at > org.apache.kafka.controller.QuorumController.lambda$createTopics$7(QuorumController.java:1832) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:767) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.base/java.lang.Thread.run(Thread.java:829) > ERROR [Controller 1] processBrokerHeartbeat: unable to start processing > because of NotControllerException. > (org.apache.kafka.controller.QuorumController) > > I'm run kafka mode Kraft !!! > Tks admin. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007243383 ## core/src/main/scala/kafka/server/ReplicaFetcherThread.scala: ## @@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String, brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) +logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition } Review Comment: Yeah, that's right. We just need to ensure that the HWM is always <= log end offset. Otherwise, this does not work. I will read this code again tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007243845 ## core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala: ## @@ -1100,6 +1100,66 @@ class ReplicaFetcherThreadTest { assertEquals(Collections.singletonList(tid1p0), fetchRequestBuilder2.removed()) } + @Test + def testFetchResponseWithPartitionData(): Unit = { +val appendInfo: LogAppendInfo = mock(classOf[LogAppendInfo]) +verifyLocalFetchCompletionAfterReplication(Some(appendInfo)) + } + + @Test + def testFetchResponseWithNoPartitionData(): Unit = { +verifyLocalFetchCompletionAfterReplication(appendInfo = None) + } + + private def verifyLocalFetchCompletionAfterReplication(appendInfo: Option[LogAppendInfo]): Unit = { +val props = TestUtils.createBrokerConfig(1, "localhost:1234") +val config = KafkaConfig.fromProps(props) + +val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend]) +when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint) + +val log: UnifiedLog = mock(classOf[UnifiedLog]) + +val partition: Partition = mock(classOf[Partition]) +when(partition.localLogOrException).thenReturn(log) +when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], any[Boolean])).thenReturn(appendInfo) + +val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition) +val brokerTopicStats = new BrokerTopicStats +when(replicaManager.brokerTopicStats).thenReturn(brokerTopicStats) + +val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota]) + +val thread = createReplicaFetcherThread( + name = "replica-fetcher", + fetcherId = 0, + brokerConfig = config, + failedPartitions = failedPartitions, + replicaMgr = replicaManager, + quota = replicaQuota, + leaderEndpointBlockingSend = mockBlockingSend) + +val tp0 = new TopicPartition("testTopic", 0) +val tp1 = new TopicPartition("testTopic", 1) +val records = MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))) +val partitionData: thread.FetchData = new FetchResponseData.PartitionData() + .setRecords(records) + +thread.processPartitionData(tp0, 0, partitionData.setPartitionIndex(0)) +thread.processPartitionData(tp1, 0, partitionData.setPartitionIndex(1)) +thread.doWork() +appendInfo match { + case Some(_) => +val argument: ArgumentCaptor[Seq[TopicPartition]] = ArgumentCaptor.forClass(classOf[Seq[TopicPartition]]) +verify(replicaManager, times(1)).completeDelayedFetchRequests(argument.capture()) +assertEquals(Seq(tp0, tp1), argument.getValue) + case None => +verify(replicaManager, times(0)).completeDelayedFetchRequests(any[Seq[TopicPartition]]) Review Comment: ah, right. i missed the `times(0)`. my bad. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12793: [KAFKA-14328]: KafkaAdminClient should be Changing the exception level …
guozhangwang commented on code in PR #12793: URL: https://github.com/apache/kafka/pull/12793#discussion_r1007251372 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -835,6 +836,10 @@ final void fail(long now, Throwable throwable) { log.debug("{} failed: {}. Beginning retry #{}", this, prettyPrintException(throwable), tries); } +//Temporarily save the last exception of the call, Review Comment: nit: space after `//`. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -944,7 +949,7 @@ int handleTimeouts(Collection calls, String msg) { Call call = iter.next(); int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs); if (remainingMs < 0) { -call.fail(now, new TimeoutException(msg + " Call: " + call.callName)); +call.fail(now, new TimeoutException(msg + " Call: " + call.callName,call.lastThrowable)); Review Comment: I think it's better to clarify it as "the last error causing retry is..", also nit: space after `,`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14328) KafkaAdminClient should be Changing the exception level When an exception occurs
[ https://issues.apache.org/jira/browse/KAFKA-14328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625256#comment-17625256 ] Guozhang Wang commented on KAFKA-14328: --- Hello [~shizhenzhen] sorry for the late reply. I was OOO for a while. I reviewed the ticket and the latest PR, and I think your proposed change makes sense, it's less intrusive than logging a warn on each retry, since people would concern it flood the log entries. I left some comments on the PR. > KafkaAdminClient should be Changing the exception level When an exception > occurs > > > Key: KAFKA-14328 > URL: https://issues.apache.org/jira/browse/KAFKA-14328 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 3.3 >Reporter: shizhenzhen >Priority: Major > Attachments: image-2022-10-21-11-19-21-064.png, > image-2022-10-21-14-56-31-753.png, image-2022-10-21-16-54-40-588.png, > image-2022-10-21-16-56-45-448.png, image-2022-10-21-16-58-19-353.png, > image-2022-10-24-14-28-10-365.png, image-2022-10-24-14-47-30-641.png, > image-2022-10-24-14-48-27-907.png > > > > > KafkaAdminClient 的一些日志全部是 log.trace. 当遇到异常的时候根本不知道什么原因,导致排查问题非常艰难。 > > 就比如下面这里,当去请求Metadata请求的时候,如果查询到的Topic有分区Leader=-1的时候,就会抛出异常; > > 但是这个时候实际上异常是被吞掉了的,这里往上面抛出异常之后,到了下面第二张图的 Catch部分。 > 他会把这个请求重新放到到请求队列中。然后就会陷入无限读重试之后,直到达到超时时间抛出异常:Timed out waiting for a node > assignment. Call: metadata > > 无法给Metadata请求分配节点,正常情况下谁知道他真正的异常其实是 > > ``` > org.apache.kafka.common.errors.LeaderNotAvailableException: There is no > leader for this topic-partition as we are in the middle of a leadership > election. > > ``` > > > > > !https://user-images.githubusercontent.com/10442648/196944422-e11b732f-6f7f-4f77-8d9c-1f0544257461.png! > > > > 下面截图那里是我改成的warn基本的日志 > !image-2022-10-21-11-19-21-064.png! > > 所以我希望这里的log.trace 能改成 log.warn ; 给一个提醒。 > 就可以说明当前因为某个异常的原因而导致可能的重试。 > > > > > > !image-2022-10-21-14-56-31-753.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
jeffkbkim commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007253784 ## core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala: ## @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.server + +import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig} +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.Admin +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.ApiKeys +import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.Properties + +class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { + val numNodes = 2 + val numParts = 1 + val initialMessages = 100 + val nMessages = 100 + + val topic = "test-fetch-from-follower" + val leaderBrokerId = 0 + val followerBrokerId = 1 + var admin: Admin = null + + def overridingProps: Properties = { +val props = new Properties +props.put(KafkaConfig.NumPartitionsProp, numParts.toString) + +props + } + + override def generateConfigs: collection.Seq[KafkaConfig] = +TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false, enableFetchFromFollower = true) + .map(KafkaConfig.fromProps(_, overridingProps)) + + @BeforeEach + def initializeFetchFromFollowerCluster(): Unit = { +// Create a 2 broker cluster where broker 0 is the leader and 1 is the follower. + +admin = TestUtils.createAdminClient(brokers, listenerName) +TestUtils.createTopicWithAdminRaw( + admin, + topic, + replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) +) +TestUtils.generateAndProduceMessages(brokers, topic, initialMessages) + } + + @AfterEach + def close(): Unit = { +if (admin != null) { + admin.close() +} + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + @Timeout(30) + def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit = { +TestUtils.generateAndProduceMessages(brokers, topic, nMessages) +// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout (30 seconds) to ensure that the +// test only passes when the delayed fetch purgatory is completed after successfully replicating from the leader. + +val totalMessages = initialMessages + nMessages +val topicPartition = new TopicPartition(topic, 0) +val offsetMap = Map[TopicPartition, Long]( + topicPartition -> (totalMessages - 1) Review Comment: the suggestion makes sense. i think the point you're getting at is we need to have the fetch sit in purgatory before replication. and i confirmed your suggestion works locally. however, i'm still unable to grasp why the current logic fails. let's say "there is no record at offset 200 yet" which means replication is not complete. the fetch request will sit in the purgatory and wake up once the record is replicated. could it be when the record at offset 200 is replicated, the follower's HWM is not updated yet to 200 so the fetch request sits in purgatory and never wakes up because there are no new records? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
jeffkbkim commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007253784 ## core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala: ## @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.server + +import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig} +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.Admin +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.ApiKeys +import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.Properties + +class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { + val numNodes = 2 + val numParts = 1 + val initialMessages = 100 + val nMessages = 100 + + val topic = "test-fetch-from-follower" + val leaderBrokerId = 0 + val followerBrokerId = 1 + var admin: Admin = null + + def overridingProps: Properties = { +val props = new Properties +props.put(KafkaConfig.NumPartitionsProp, numParts.toString) + +props + } + + override def generateConfigs: collection.Seq[KafkaConfig] = +TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false, enableFetchFromFollower = true) + .map(KafkaConfig.fromProps(_, overridingProps)) + + @BeforeEach + def initializeFetchFromFollowerCluster(): Unit = { +// Create a 2 broker cluster where broker 0 is the leader and 1 is the follower. + +admin = TestUtils.createAdminClient(brokers, listenerName) +TestUtils.createTopicWithAdminRaw( + admin, + topic, + replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) +) +TestUtils.generateAndProduceMessages(brokers, topic, initialMessages) + } + + @AfterEach + def close(): Unit = { +if (admin != null) { + admin.close() +} + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + @Timeout(30) + def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit = { +TestUtils.generateAndProduceMessages(brokers, topic, nMessages) +// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout (30 seconds) to ensure that the +// test only passes when the delayed fetch purgatory is completed after successfully replicating from the leader. + +val totalMessages = initialMessages + nMessages +val topicPartition = new TopicPartition(topic, 0) +val offsetMap = Map[TopicPartition, Long]( + topicPartition -> (totalMessages - 1) Review Comment: the suggestion makes sense. i think the point you're getting at is we need to have the fetch sit in purgatory before replication. and i confirmed your suggestion works locally. however, i'm still unable to grasp why the current logic fails. let's say "there is no record at offset 200 yet" which means replication is not complete. the fetch request will sit in the purgatory and wake up once the record is replicated. could it be the case where record at offset 200 is already replicated but the follower's HWM is not updated yet to 200 so the fetch request sits in purgatory and never wakes up because there are no new records? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12793: [KAFKA-14328]: KafkaAdminClient should be Changing the exception level …
guozhangwang commented on PR #12793: URL: https://github.com/apache/kafka/pull/12793#issuecomment-1293946483 @shirenchuang the checkstyle failures are related to the coding styles in the PR, you can run the commands locally (there's instructions in `README.md`) to check again when you update the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cowlike commented on a diff in pull request #12769: KAFKA-14314: Add check for null upstreamTopic
cowlike commented on code in PR #12769: URL: https://github.com/apache/kafka/pull/12769#discussion_r1007313398 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ## @@ -319,4 +320,17 @@ public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception { connector.refreshTopicPartitions(); verify(connector, times(1)).computeAndCreateTopicPartitions(); } + +@Test +public void testIsCycleWithNullUpstreamTopic() throws Exception { +class BadReplicationPolicy extends DefaultReplicationPolicy { Review Comment: > What about `CustomReplicationPolicy` as it's not really a bad policy, `null` is allowed `CustomReplicationPolicy` will not fail the test since it ends up return `false` after the first line of `isCycle` returns null: ```java boolean isCycle(String topic) { String source = replicationPolicy.topicSource(topic); //... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cowlike commented on a diff in pull request #12769: KAFKA-14314: Add check for null upstreamTopic
cowlike commented on code in PR #12769: URL: https://github.com/apache/kafka/pull/12769#discussion_r1007313398 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ## @@ -319,4 +320,17 @@ public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception { connector.refreshTopicPartitions(); verify(connector, times(1)).computeAndCreateTopicPartitions(); } + +@Test +public void testIsCycleWithNullUpstreamTopic() throws Exception { +class BadReplicationPolicy extends DefaultReplicationPolicy { Review Comment: > What about `CustomReplicationPolicy` as it's not really a bad policy, `null` is allowed `CustomReplicationPolicy` will not fail the test since it ends up returning `false` after the first line of `isCycle` returns null: ```java boolean isCycle(String topic) { String source = replicationPolicy.topicSource(topic); //... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007411497 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java: ## @@ -0,0 +1,15 @@ +package org.apache.kafka.connect.mirror.clients.admin; Review Comment: Added the license ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java: ## @@ -0,0 +1,117 @@ +package org.apache.kafka.connect.mirror.clients.admin; Review Comment: Added the license -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007412233 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,596 @@ +/* Review Comment: Fixed the checkstyle `./gradlew checkstyleMain checkstyleTest` should now be green -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007412233 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,596 @@ +/* Review Comment: Fixed the checkstyle `./gradlew checkstyleMain checkstyleTest` should now successed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007424805 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,596 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAdmin to create/alter topics, partitions and ACLs. + */ +@Tag("integration") +public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest { +protected static final int NUM_RECORDS_PER_PARTITION = 1; +protected static final int NUM_PARTITIONS = 1; +protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; +protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L); +protected static final String PRIMARY_CLUSTER_ALIAS = "pr
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007434322 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,596 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAdmin to create/alter topics, partitions and ACLs. + */ +@Tag("integration") +public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest { +protected static final int NUM_RECORDS_PER_PARTITION = 1; +protected static final int NUM_PARTITIONS = 1; +protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; +protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L); +protected static final String PRIMARY_CLUSTER_ALIAS = "pr
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625360#comment-17625360 ] A. Sophie Blee-Goldman commented on KAFKA-14016: [~aiquestion] would you be interested in submitting a PR for reverting KAFKA-13891? It still needs a PR but Luke or I can merge it as soon as the PR build finishes running. In the meantime I'll look into the assignor's handling of this kind of thing and submit a patch if we're missing the logic for this (which from your report, it sounds like we are) > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Scanteianu commented on pull request #12753: MINOR: Document Offset and Partition 0-indexing, fix typo
Scanteianu commented on PR #12753: URL: https://github.com/apache/kafka/pull/12753#issuecomment-1294252160 > @Scanteianu , thanks for the update, but there is still something wrong. For the test, I don't think another tests in MockConsumer is needed and correct. After all, MockConsumer is a `Mock` one, some behaviors doesn't match the real consumer. For the seek test, we already have one here: `PlaintextConsumerTest#testSeek`. If you think there are some tests missing, you can add in the test. > > BTW, `PlaintextConsumerTest` is integration tests, so it'll launch real brokers and consumers to do tests. You can test it there. > > Thanks. @showuon thanks for the review, I really appreciate your taking the time to look at it, and point me to the right places in the code to add tests and the configs that specify behaviour! I have worked with people who believe mocks should be as close to the behaviour of the "real deal" as possible (and am partial to this opinion myself), but I'll maybe open another PR with that, as I think that is more of a stylistic opinion (whether Apache has this codified somewhere, I don't know), and I'd really like to improve the seek documentation, as I spent quite a few hours googling for this stuff. For now, I've moved the tests to the Plaintext Consumer, and added tests for the various auto.offset.reset configs. The behaviours in latest seem super weird to me, based on your explanation - they don't seem to match https://medium.com/lydtech-consulting/kafka-consumer-auto-offset-reset-d3962bad2665 specifically for the `latest` value. In the case of `latest`, if the next batch of records contains the offset that is passed to seek, it seems to skip to that offset, which is the same behaviour that the mock consumer had, which kind of makes sense. However, if the next batch of records doesn't contain the offset last passed to seek, it resets to the very beginning (offset 0). Either I am missing something, or this might be a bug? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
jeffkbkim commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007536584 ## core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala: ## @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.server + +import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig} +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.Admin +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.ApiKeys +import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.Properties + +class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { + val numNodes = 2 + val numParts = 1 + val initialMessages = 100 + val nMessages = 100 + + val topic = "test-fetch-from-follower" + val leaderBrokerId = 0 + val followerBrokerId = 1 + var admin: Admin = null + + def overridingProps: Properties = { +val props = new Properties +props.put(KafkaConfig.NumPartitionsProp, numParts.toString) + +props + } + + override def generateConfigs: collection.Seq[KafkaConfig] = +TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false, enableFetchFromFollower = true) + .map(KafkaConfig.fromProps(_, overridingProps)) + + @BeforeEach + def initializeFetchFromFollowerCluster(): Unit = { +// Create a 2 broker cluster where broker 0 is the leader and 1 is the follower. + +admin = TestUtils.createAdminClient(brokers, listenerName) +TestUtils.createTopicWithAdminRaw( + admin, + topic, + replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) +) +TestUtils.generateAndProduceMessages(brokers, topic, initialMessages) + } + + @AfterEach + def close(): Unit = { +if (admin != null) { + admin.close() +} + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + @Timeout(30) + def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit = { +TestUtils.generateAndProduceMessages(brokers, topic, nMessages) +// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout (30 seconds) to ensure that the +// test only passes when the delayed fetch purgatory is completed after successfully replicating from the leader. + +val totalMessages = initialMessages + nMessages +val topicPartition = new TopicPartition(topic, 0) +val offsetMap = Map[TopicPartition, Long]( + topicPartition -> (totalMessages - 1) Review Comment: ok, looking deeper into this, here's my understanding as to why fetching offset 200 did not work: The log end offset points to the next offset the log will append a record, so technically it doesn't exist yet. furthermore, the high watermark is updated to the log end offset. in `Partition.maybeIncrementLeaderHW()`: ``` var newHighWatermark = leaderLogEndOffset ... leaderLog.maybeIncrementHighWatermark(newHighWatermark) ``` even if the fetch offset is equal to the high watermark, we cannot fetch it if the high watermark is pointing to the log end offset. this is done in DelayedFetch where we skip accumulating bytes `if (endOffset.messageOffset (LOE) == fetchOffset.messageOffset (HWM))` @dajac does that align with your understanding? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
jeffkbkim commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007544878 ## core/src/main/scala/kafka/server/ReplicaFetcherThread.scala: ## @@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String, brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) +logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition } Review Comment: i believe that is the case. from [UnifiedLog.updatedHighWatermarkWithLogEndOffset()](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/UnifiedLog.scala#L597-L603) ``` private def updateHighWatermarkWithLogEndOffset(): Unit = { // Update the high watermark in case it has gotten ahead of the log end offset following a truncation // or if a new segment has been rolled and the offset metadata needs to be updated. if (highWatermark >= localLog.logEndOffset) { updateHighWatermarkMetadata(localLog.logEndOffsetMetadata) } } ``` this is called on every `UnifiedLog.append()` and `UnifiedLog.roll()` however, i did notice that `LeaderHWChange` is not set anywhere else except in the produce code path. from `Partition.appendRecordsToLeader()`: ``` info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same) ``` no other `LogAppendInfo` initialization passes in LeaderHWChange. so, the follower would need to keep track of whether the high watermark changed for each partition which i don't think is something we want to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
jeffkbkim commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007253784 ## core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala: ## @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.server + +import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig} +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.Admin +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.ApiKeys +import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.Properties + +class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { + val numNodes = 2 + val numParts = 1 + val initialMessages = 100 + val nMessages = 100 + + val topic = "test-fetch-from-follower" + val leaderBrokerId = 0 + val followerBrokerId = 1 + var admin: Admin = null + + def overridingProps: Properties = { +val props = new Properties +props.put(KafkaConfig.NumPartitionsProp, numParts.toString) + +props + } + + override def generateConfigs: collection.Seq[KafkaConfig] = +TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false, enableFetchFromFollower = true) + .map(KafkaConfig.fromProps(_, overridingProps)) + + @BeforeEach + def initializeFetchFromFollowerCluster(): Unit = { +// Create a 2 broker cluster where broker 0 is the leader and 1 is the follower. + +admin = TestUtils.createAdminClient(brokers, listenerName) +TestUtils.createTopicWithAdminRaw( + admin, + topic, + replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) +) +TestUtils.generateAndProduceMessages(brokers, topic, initialMessages) + } + + @AfterEach + def close(): Unit = { +if (admin != null) { + admin.close() +} + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + @Timeout(30) + def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit = { +TestUtils.generateAndProduceMessages(brokers, topic, nMessages) +// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout (30 seconds) to ensure that the +// test only passes when the delayed fetch purgatory is completed after successfully replicating from the leader. + +val totalMessages = initialMessages + nMessages +val topicPartition = new TopicPartition(topic, 0) +val offsetMap = Map[TopicPartition, Long]( + topicPartition -> (totalMessages - 1) Review Comment: the suggestion makes sense. i think the point you're getting at is we need to have the fetch sit in purgatory before replication. and i confirmed your suggestion works locally. however, i'm still unable to grasp why fetching offset 200 fails. let's say "there is no record at offset 200 yet" which means replication is not complete. the fetch request will sit in the purgatory and wake up once the record is replicated. could it be the case where record at offset 200 is already replicated but the follower's HWM is not updated yet to 200 so the fetch request sits in purgatory and never wakes up because there are no new records? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
jeffkbkim commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1007549176 ## core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala: ## @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.server + +import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig} +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.Admin +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.ApiKeys +import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.Properties + +class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { + val numNodes = 2 + val numParts = 1 + val initialMessages = 100 + val nMessages = 100 + + val topic = "test-fetch-from-follower" + val leaderBrokerId = 0 + val followerBrokerId = 1 + var admin: Admin = null + + def overridingProps: Properties = { +val props = new Properties +props.put(KafkaConfig.NumPartitionsProp, numParts.toString) + +props + } + + override def generateConfigs: collection.Seq[KafkaConfig] = +TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false, enableFetchFromFollower = true) + .map(KafkaConfig.fromProps(_, overridingProps)) + + @BeforeEach + def initializeFetchFromFollowerCluster(): Unit = { +// Create a 2 broker cluster where broker 0 is the leader and 1 is the follower. + +admin = TestUtils.createAdminClient(brokers, listenerName) +TestUtils.createTopicWithAdminRaw( + admin, + topic, + replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) +) +TestUtils.generateAndProduceMessages(brokers, topic, initialMessages) + } + + @AfterEach + def close(): Unit = { +if (admin != null) { + admin.close() +} + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + @Timeout(30) + def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit = { +TestUtils.generateAndProduceMessages(brokers, topic, nMessages) +// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout (30 seconds) to ensure that the +// test only passes when the delayed fetch purgatory is completed after successfully replicating from the leader. + +val totalMessages = initialMessages + nMessages +val topicPartition = new TopicPartition(topic, 0) +val offsetMap = Map[TopicPartition, Long]( + topicPartition -> (totalMessages - 1) Review Comment: also, i confirmed that the existing test fails when the follower is already up-to-date. the test above should work generally because the fetch request reaches the follower before the follower is fully replicated. will update the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] opencmit2 commented on pull request #12788: [Test]Testing Gradle OOM
opencmit2 commented on PR #12788: URL: https://github.com/apache/kafka/pull/12788#issuecomment-1294428194 Note: DefaultCachedClasspathTransformer, The latest code has been modified to limit the number of threads. It is changed to be linked to the number of CPU cores. Limit the number of threads of gradle jar transfer thread We can think from these two aspects -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org