[GitHub] [kafka] sudeshwasnik commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush

2022-10-27 Thread GitBox


sudeshwasnik commented on code in PR #12784:
URL: https://github.com/apache/kafka/pull/12784#discussion_r1006513264


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,15 +282,16 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+if (flushFuture != null) {

Review Comment:
   Started with the same thought, but tried making it consistent with 
[WorkerSourceTask](https://github.com/apache/kafka/blob/3ae1afa43838066e44ea78918050c6780c208042/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L253-L267).
 
   
   If there are no offsets and no error, we should still commit (producer and 
task). Need to make changes in WorkerSourceTask as well! 
   I'll make the suggested change ! 
   thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14337) topic name with "." cannot be created after deletion

2022-10-27 Thread thanhnd96 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624931#comment-17624931
 ] 

thanhnd96 commented on KAFKA-14337:
---

Hi admin,

Tks you for fixed , after i change code to this link [GitHub Pull Request 
#12790|https://github.com/apache/kafka/pull/12790] , it's worked on kafka 
Source download: 
[kafka-3.3.1-src.tgz.|https://downloads.apache.org/kafka/3.3.1/kafka-3.3.1-src.tgz]

But can I fix on Binary downloads: Scala 2.13  - 
[kafka_2.13-3.3.1.tgz|https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz]
 ([asc|https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz.asc], 
[sha512|https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz.sha512]) 
on my kafka.

I deploy to 3 broker kafka running kraft mode .

Best regards !!!

> topic name with "." cannot be created after deletion
> 
>
> Key: KAFKA-14337
> URL: https://issues.apache.org/jira/browse/KAFKA-14337
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: thanhnd96
>Assignee: Luke Chen
>Priority: Critical
>
> Hi admin,
> My issue is after i create topic like topic.AAA or Topic.AAA.01 then delete 1 
> of the other 2 topics.
> Then i can't create 1 of the 2 topics.
> But i create topic test123 then delete and recreate fine.
> This is log i tried to create topic.AAA
> WARN [Controller 1] createTopics: failed with unknown server exception 
> NoSuchElementException at epoch 14 in 193 us.  Renouncing leadership and 
> reverting to the last committed offset 28. 
> (org.apache.kafka.controller.QuorumController)
> java.util.NoSuchElementException
>         at 
> org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:167)
>         at 
> org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:139)
>         at 
> org.apache.kafka.timeline.TimelineHashSet$ValueIterator.next(TimelineHashSet.java:120)
>         at 
> org.apache.kafka.controller.ReplicationControlManager.validateNewTopicNames(ReplicationControlManager.java:799)
>         at 
> org.apache.kafka.controller.ReplicationControlManager.createTopics(ReplicationControlManager.java:567)
>         at 
> org.apache.kafka.controller.QuorumController.lambda$createTopics$7(QuorumController.java:1832)
>         at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:767)
>         at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>         at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>         at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> ERROR [Controller 1] processBrokerHeartbeat: unable to start processing 
> because of NotControllerException. 
> (org.apache.kafka.controller.QuorumController)
>  
> I'm run kafka mode Kraft !!!
> Tks admin.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13419) sync group failed with rebalanceInProgress error might cause out-of-date ownedPartition in Cooperative protocol

2022-10-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624934#comment-17624934
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13419:


Cool, thanks for confirming this is already handled in the assignor. Ok then, 
that said – or actually, _because_ of this, I do agree that we should revert 
https://issues.apache.org/jira/browse/KAFKA-13891. 

 

Maybe we can look into the logs to better understand the original problem you 
were experiencing [~aiquestion] – or were you not personally hitting the issue 
"fixed" by https://issues.apache.org/jira/browse/KAFKA-13891, just vigilant 
about submitting a patch when you noticed we had claimed to make that change in 
[https://github.com/apache/kafka/pull/11451] but seemingly forgot?

> sync group failed with rebalanceInProgress error might cause out-of-date 
> ownedPartition in Cooperative protocol
> ---
>
> Key: KAFKA-13419
> URL: https://issues.apache.org/jira/browse/KAFKA-13419
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.1.0
>
>
> In KAFKA-13406, we found there's user got stuck when in rebalancing with 
> cooperative sticky assignor. The reason is the "ownedPartition" is 
> out-of-date, and it failed the cooperative assignment validation.
> Investigate deeper, I found the root cause is we didn't reset generation and 
> state after sync group fail. In KAFKA-12983, we fixed the issue that the 
> onJoinPrepare is not called in resetStateAndRejoin method. And it causes the 
> ownedPartition not get cleared. But there's another case that the 
> ownedPartition will be out-of-date. Here's the example:
>  # consumer A joined and synced group successfully with generation 1
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  
> We might want to do *resetStateAndRejoin* when *RebalanceInProgressException* 
> errors happend in *sync group*. Because when we got sync group error, it 
> means, join group passed, and other consumers (and the leader) might already 
> completed this round of rebalance. The assignment distribution this consumer 
> have is already out-of-date.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-10149) Do not prevent automatic preferred election when reassignment in progress

2022-10-27 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-10149:
-

Assignee: Shenglong Zhang

> Do not prevent automatic preferred election when reassignment in progress
> -
>
> Key: KAFKA-10149
> URL: https://issues.apache.org/jira/browse/KAFKA-10149
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Shenglong Zhang
>Priority: Major
> Fix For: 3.4.0
>
>
> Currently the controller will not do preferred leader elections automatically 
> when a reassignment is in progress. If a broker crashes or is restarted with 
> a reassignment in progress, this leaves the leadership massively skewed until 
> the reassignment completes. I am not sure if there is a good reason for this, 
> but it seems not ideal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-13419) sync group failed with rebalanceInProgress error might cause out-of-date ownedPartition in Cooperative protocol

2022-10-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624934#comment-17624934
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-13419 at 10/27/22 7:56 AM:
--

Cool, thanks for confirming this is already handled in the assignor. Ok then, 
that said – or actually, _because_ of this, I do agree that we should revert 
https://issues.apache.org/jira/browse/KAFKA-13891. 

 

Maybe we can look into the logs to better understand the original problem you 
were experiencing [~aiquestion] – or were you not personally hitting the issue 
"fixed" by https://issues.apache.org/jira/browse/KAFKA-13891, just vigilant 
about submitting a patch when you noticed we had mentioned making this change 
in [https://github.com/apache/kafka/pull/11451] but then apparently left it out?


was (Author: ableegoldman):
Cool, thanks for confirming this is already handled in the assignor. Ok then, 
that said – or actually, _because_ of this, I do agree that we should revert 
https://issues.apache.org/jira/browse/KAFKA-13891. 

 

Maybe we can look into the logs to better understand the original problem you 
were experiencing [~aiquestion] – or were you not personally hitting the issue 
"fixed" by https://issues.apache.org/jira/browse/KAFKA-13891, just vigilant 
about submitting a patch when you noticed we had claimed to make that change in 
[https://github.com/apache/kafka/pull/11451] but seemingly forgot?

> sync group failed with rebalanceInProgress error might cause out-of-date 
> ownedPartition in Cooperative protocol
> ---
>
> Key: KAFKA-13419
> URL: https://issues.apache.org/jira/browse/KAFKA-13419
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.1.0
>
>
> In KAFKA-13406, we found there's user got stuck when in rebalancing with 
> cooperative sticky assignor. The reason is the "ownedPartition" is 
> out-of-date, and it failed the cooperative assignment validation.
> Investigate deeper, I found the root cause is we didn't reset generation and 
> state after sync group fail. In KAFKA-12983, we fixed the issue that the 
> onJoinPrepare is not called in resetStateAndRejoin method. And it causes the 
> ownedPartition not get cleared. But there's another case that the 
> ownedPartition will be out-of-date. Here's the example:
>  # consumer A joined and synced group successfully with generation 1
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  
> We might want to do *resetStateAndRejoin* when *RebalanceInProgressException* 
> errors happend in *sync group*. Because when we got sync group error, it 
> means, join group passed, and other consumers (and the leader) might already 
> completed this round of rebalance. The assignment distribution this consumer 
> have is already out-of-date.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006531902


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -135,8 +161,7 @@ public class KafkaBasedLogTest {
 @SuppressWarnings("unchecked")
 @Before
 public void setUp() {
-store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, 
initializer);
+store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
() -> null, consumedCallback, time, null);

Review Comment:
   > I think the @Mock private Runnable initializer is the incorrect signature 
here, and should be changed to be a Consumer mock. Otherwise the initializer 
value isn't behaving like a mock for the mocked kafka based log.
   
   So it appears that the `@Mock private Runnable initializer` is being used 
correctly in the `setupWithAdmin` method and if you look at `setupWithAdmin` 
you can confirm its correct because we aren't using a mock, instead we actually 
using the raw constructor, i.e.
   
   ```java
   private void setupWithAdmin() {
   Supplier adminSupplier = () -> admin;
   java.util.function.Consumer initializer = admin -> { };
   store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, 
CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer);
   }
   ```
   
   I believe the oversight was just in the `setUp` of the mock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sudeshwasnik commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush

2022-10-27 Thread GitBox


sudeshwasnik commented on code in PR #12784:
URL: https://github.com/apache/kafka/pull/12784#discussion_r1006513264


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,15 +282,16 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+if (flushFuture != null) {

Review Comment:
   Started with the same thought, but tried making it consistent with 
[WorkerSourceTask](https://github.com/apache/kafka/blob/3ae1afa43838066e44ea78918050c6780c208042/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L253-L267).
 
   
   If there are no offsets and no errors, we should still commit (producer and 
task) +1 
   I'll make the suggested change! 
   thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] shirenchuang opened a new pull request, #12793: [KAFKA-14328]: KafkaAdminClient should be Changing the exception level …

2022-10-27 Thread GitBox


shirenchuang opened a new pull request, #12793:
URL: https://github.com/apache/kafka/pull/12793

   [ISSUE: KAFKA-14328](https://issues.apache.org/jira/browse/KAFKA-14328)
   
   PR:
   return the real error while we return the timeout error
   
   In the Pending Call timeout scenario, record the last retry timeout 
exception and print the last retry exception while it is timed out
   This helps users locate problems
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006543034


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -547,32 +538,18 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
 private void setupWithAdmin() {
 Supplier adminSupplier = () -> admin;
 java.util.function.Consumer initializer = admin -> { };
-store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
+store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer);
 }
 
-private void expectProducerAndConsumerCreate() throws Exception {
-PowerMock.expectPrivate(store, "createProducer")
- .andReturn(producer);
-PowerMock.expectPrivate(store, "createConsumer")
- .andReturn(consumer);
-}
-
-private void expectStart() throws Exception {
+private void expectStart() {
 initializer.run();
-EasyMock.expectLastCall().times(1);
-
-expectProducerAndConsumerCreate();
+verify(initializer, times(1)).run();
 }
 
 private void expectStop() {
 producer.close();
-PowerMock.expectLastCall();
+verify(producer, times(1)).close();

Review Comment:
   > Does the producer.close() call trivially satisfy the following verify() 
call?
   
   So to be honest I don't know whats going on here. The literal translation of
   
   ```java
   producer.close();
   PowerMock.expectLastCall();
   ```
   to Mockito is
   ```java
   producer.close();
   verify(producer, times(1)).close();
   ```
   
   So to answer your question this does seem fishy because of course we call 
`producer.close()` we expect it to be called unless some exception is thrown 
(which would fail the test otherwise).
   
   > Also it looks like this method is not really translated from easymock yet, 
as it is separate from the store.stop() calls throughout the test. I think that 
instead of calling store.stop(), the test should call expectStop, and then the 
expectStop calls store.stop and asserts that the shutdown procedure happens.
   
   Are you alluding to the fact that the equivalent `PowerMock.verifyAll();` is 
missing here? If thats so then yes this is indeed the case and this is one of 
the problems with PowerMock/EasyMock is that its not that explicit, i.e. 
`PowerMock.verifyAll()` will verify methods that are directly called (which 
almost all of the time is pointless) where as ideally you are meant to use 
`verify` on methods that are called indirectly called by tests to ensure that 
they are being executed. 
   
   So I guess to answer your test more directly, I can see what you are saying 
but I would like to know what exactly a "shutdown procedure" actually means in 
terms of mocking, i.e. what methods (aside from `store.stop()` which we 
directly call) should we be verifying? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006543034


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -547,32 +538,18 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
 private void setupWithAdmin() {
 Supplier adminSupplier = () -> admin;
 java.util.function.Consumer initializer = admin -> { };
-store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
+store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer);
 }
 
-private void expectProducerAndConsumerCreate() throws Exception {
-PowerMock.expectPrivate(store, "createProducer")
- .andReturn(producer);
-PowerMock.expectPrivate(store, "createConsumer")
- .andReturn(consumer);
-}
-
-private void expectStart() throws Exception {
+private void expectStart() {
 initializer.run();
-EasyMock.expectLastCall().times(1);
-
-expectProducerAndConsumerCreate();
+verify(initializer, times(1)).run();
 }
 
 private void expectStop() {
 producer.close();
-PowerMock.expectLastCall();
+verify(producer, times(1)).close();

Review Comment:
   > Does the producer.close() call trivially satisfy the following verify() 
call?
   
   So to be honest I don't know whats going on here. The literal translation of
   
   ```java
   producer.close();
   PowerMock.expectLastCall();
   ```
   to Mockito is
   ```java
   producer.close();
   verify(producer, times(1)).close();
   ```
   
   To answer your question this does seem fishy because of course we call 
`producer.close()` we expect it to be called unless some exception is thrown 
(which would fail the test otherwise anyways).
   
   > Also it looks like this method is not really translated from easymock yet, 
as it is separate from the store.stop() calls throughout the test. I think that 
instead of calling store.stop(), the test should call expectStop, and then the 
expectStop calls store.stop and asserts that the shutdown procedure happens.
   
   Are you alluding to the fact that the equivalent `PowerMock.verifyAll();` is 
missing here? If thats so then yes this is indeed the case and this is one of 
the problems with PowerMock/EasyMock is that its not that explicit, i.e. 
`PowerMock.verifyAll()` will verify methods that are directly called (which 
almost all of the time is pointless) where as ideally you are meant to use 
`verify` on methods that are called indirectly called by tests to ensure that 
they are being executed. 
   
   So I guess to answer your test more directly, I can see what you are saying 
but I would like to know what exactly a "shutdown procedure" actually means in 
terms of mocking, i.e. what methods (aside from `store.stop()` which we 
directly call) should we be verifying? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006543034


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -547,32 +538,18 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
 private void setupWithAdmin() {
 Supplier adminSupplier = () -> admin;
 java.util.function.Consumer initializer = admin -> { };
-store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
+store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer);
 }
 
-private void expectProducerAndConsumerCreate() throws Exception {
-PowerMock.expectPrivate(store, "createProducer")
- .andReturn(producer);
-PowerMock.expectPrivate(store, "createConsumer")
- .andReturn(consumer);
-}
-
-private void expectStart() throws Exception {
+private void expectStart() {
 initializer.run();
-EasyMock.expectLastCall().times(1);
-
-expectProducerAndConsumerCreate();
+verify(initializer, times(1)).run();
 }
 
 private void expectStop() {
 producer.close();
-PowerMock.expectLastCall();
+verify(producer, times(1)).close();

Review Comment:
   > Does the producer.close() call trivially satisfy the following verify() 
call?
   
   So to be honest I don't know whats going on here. The literal translation of
   
   ```java
   producer.close();
   PowerMock.expectLastCall();
   ```
   to Mockito is
   ```java
   producer.close();
   verify(producer, times(1)).close();
   ```
   
   To answer your question this does seem fishy because of course we call 
`producer.close()` we expect it to be called unless some exception is thrown 
(which would fail the test otherwise anyways).
   
   > Also it looks like this method is not really translated from easymock yet, 
as it is separate from the store.stop() calls throughout the test. I think that 
instead of calling store.stop(), the test should call expectStop, and then the 
expectStop calls store.stop and asserts that the shutdown procedure happens.
   
   Are you alluding to the fact that the equivalent `PowerMock.verifyAll();` is 
missing at the end of the test? If thats so then yes this is indeed the case 
and this is one of the problems with PowerMock/EasyMock is that its not that 
explicit, i.e. `PowerMock.verifyAll()` will verify methods that are directly 
called (which almost all of the time is pointless) where as ideally you are 
meant to use `verify` on methods that are called indirectly called by tests to 
ensure that they are being executed. 
   
   So I guess to answer your test more directly, I can see what you are saying 
but I would like to know what exactly a "shutdown procedure" actually means in 
terms of mocking, i.e. what methods (aside from `store.stop()` which we 
directly call) should we be verifying? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006543034


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -547,32 +538,18 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
 private void setupWithAdmin() {
 Supplier adminSupplier = () -> admin;
 java.util.function.Consumer initializer = admin -> { };
-store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
+store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer);
 }
 
-private void expectProducerAndConsumerCreate() throws Exception {
-PowerMock.expectPrivate(store, "createProducer")
- .andReturn(producer);
-PowerMock.expectPrivate(store, "createConsumer")
- .andReturn(consumer);
-}
-
-private void expectStart() throws Exception {
+private void expectStart() {
 initializer.run();
-EasyMock.expectLastCall().times(1);
-
-expectProducerAndConsumerCreate();
+verify(initializer, times(1)).run();
 }
 
 private void expectStop() {
 producer.close();
-PowerMock.expectLastCall();
+verify(producer, times(1)).close();

Review Comment:
   > Does the producer.close() call trivially satisfy the following verify() 
call?
   
   So to be honest I don't know whats going on here. The literal translation of
   
   ```java
   producer.close();
   PowerMock.expectLastCall();
   ```
   to Mockito is
   ```java
   producer.close();
   verify(producer, times(1)).close();
   ```
   
   To answer your question this does seem fishy because of course we call 
`producer.close()` we expect it to be called unless some exception is thrown 
(which would fail the test otherwise anyways).
   
   > Also it looks like this method is not really translated from easymock yet, 
as it is separate from the store.stop() calls throughout the test. I think that 
instead of calling store.stop(), the test should call expectStop, and then the 
expectStop calls store.stop and asserts that the shutdown procedure happens.
   
   Are you alluding to the fact that the equivalent `PowerMock.verifyAll();` is 
missing at the end of the test? If thats so then yes this is indeed the case 
and this is one of the problems with PowerMock/EasyMock is that its not that 
explicit, i.e. `PowerMock.verifyAll()` will verify methods that are directly 
called (which almost all of the time is pointless) where as ideally you are 
meant to use `verify` on methods that are called indirectly called by tests to 
ensure that they are being executed. 
   
   So I guess to answer your test more directly, I can see what you are saying 
but I would like to know what exactly a "shutdown procedure" actually means in 
terms of mocking, i.e. what methods (aside from `store.stop()` which we 
directly call) should we be verifying? If there aren't any indirect methods 
that need to be verified with `.verify` then unless I am missing something 
nothing really needs to be done here. As long as `.stop` is successfully 
executed without throwing an exception then that is the best we can do?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14328) KafkaAdminClient should be Changing the exception level When an exception occurs

2022-10-27 Thread shizhenzhen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624947#comment-17624947
 ] 

shizhenzhen commented on KAFKA-14328:
-

PR:  https://github.com/apache/kafka/pull/12793

> KafkaAdminClient should be Changing the exception level When an exception 
> occurs
> 
>
> Key: KAFKA-14328
> URL: https://issues.apache.org/jira/browse/KAFKA-14328
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 3.3
>Reporter: shizhenzhen
>Priority: Major
> Attachments: image-2022-10-21-11-19-21-064.png, 
> image-2022-10-21-14-56-31-753.png, image-2022-10-21-16-54-40-588.png, 
> image-2022-10-21-16-56-45-448.png, image-2022-10-21-16-58-19-353.png, 
> image-2022-10-24-14-28-10-365.png, image-2022-10-24-14-47-30-641.png, 
> image-2022-10-24-14-48-27-907.png
>
>
>  
>  
> KafkaAdminClient 的一些日志全部是 log.trace.  当遇到异常的时候根本不知道什么原因,导致排查问题非常艰难。
>  
> 就比如下面这里,当去请求Metadata请求的时候,如果查询到的Topic有分区Leader=-1的时候,就会抛出异常;
>  
> 但是这个时候实际上异常是被吞掉了的,这里往上面抛出异常之后,到了下面第二张图的 Catch部分。
> 他会把这个请求重新放到到请求队列中。然后就会陷入无限读重试之后,直到达到超时时间抛出异常:Timed out waiting for a node 
> assignment. Call: metadata
>  
> 无法给Metadata请求分配节点,正常情况下谁知道他真正的异常其实是
>  
> ```
> org.apache.kafka.common.errors.LeaderNotAvailableException: There is no 
> leader for this topic-partition as we are in the middle of a leadership 
> election.
>  
> ```
>  
>  
>  
>  
> !https://user-images.githubusercontent.com/10442648/196944422-e11b732f-6f7f-4f77-8d9c-1f0544257461.png!
>  
>  
>  
> 下面截图那里是我改成的warn基本的日志
> !image-2022-10-21-11-19-21-064.png!
>  
> 所以我希望这里的log.trace 能改成 log.warn ; 给一个提醒。
> 就可以说明当前因为某个异常的原因而导致可能的重试。
>  
>  
> 
>  
>  
> !image-2022-10-21-14-56-31-753.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2022-10-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624948#comment-17624948
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14016:


Hey, I realize there's a lot of history leading up to this issue and the 
associated "fix", so forgive me for missing anything while I'm getting up to 
speed – but taking a step back, before we jump into the discussion about 
alternative fixes for KAFKA-13891 can we flesh out the actual underlying 
problem & take stock of what symptoms people have actually seen vs theorized 
about?

Sorry for pushing back on this, it just seems like we've been playing 
whack-a-mole with these rebalancing issues lately, and the moles have started 
to whack us back. I just want to encourage us all to approach this carefully so 
we're not having the exact same conversation and reaching for yet more 
alternative fixes by the next release.

[~aiquestion] I guess this question is mostly directed at you, as the original 
reporter of KAFKA-13891: were you able to reproduce or experienced this in a 
real application, or was this mainly filed to follow up on the suggestion for 
it in KAFKA-13419? Sorry to repeat a bit of the conversation over on 
KAFKA-13419, but for full context here I was curious about the actual symptoms 
of KAFKA-13891 vs the scenario outlined in that ticket. Specifically, I think 
we need to expand and/or elaborate on the effect of having an old (but valid) 
generation when the consumer is still the most recent owner to claim its given 
partitions. The sticky assignment algorithm should account for this and IIRC 
will basically consider whoever has the highest valid generation for a 
partition as its previous owner, which in this case would always be the 
consumer that received this REBALANCE_IN_PROGRESS error.

 

To summarize, basically I'm not convinced that failing to reset the generation 
has any impact on the assignor in this case, ie that there is an actual 
bug/problem described in KAFKA-13891 – in fact, forcing the consumer to reset 
everything when a rebalance is restarted seems actively detrimental, as 
evidenced by this exact ticket. I would vote for just reverting the changes we 
made for that and call it a day until/unless we see concrete evidence/symptoms 
that are impacting a real application.

[~showuon] / [~aiquestion] / [~guozhang] Thoughts?

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.

[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006531902


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -135,8 +161,7 @@ public class KafkaBasedLogTest {
 @SuppressWarnings("unchecked")
 @Before
 public void setUp() {
-store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, 
initializer);
+store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
() -> null, consumedCallback, time, null);

Review Comment:
   > I think the @Mock private Runnable initializer is the incorrect signature 
here, and should be changed to be a Consumer mock. Otherwise the initializer 
value isn't behaving like a mock for the mocked kafka based log.
   
   So it appears that the `@Mock private Runnable initializer` is being used 
correctly in the `setupWithAdmin` method and if you look at `setupWithAdmin` 
you can confirm this because we aren't using a mock, instead we actually using 
the raw constructor on an extended class, i.e.
   
   ```java
   private void setupWithAdmin() {
   Supplier adminSupplier = () -> admin;
   java.util.function.Consumer initializer = admin -> { };
   store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, 
CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer);
   }
   ```
   
   I believe the oversight was just in the `setUp` of the mock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006560892


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -547,32 +538,18 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
 private void setupWithAdmin() {
 Supplier adminSupplier = () -> admin;
 java.util.function.Consumer initializer = admin -> { };
-store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
+store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer);
 }
 
-private void expectProducerAndConsumerCreate() throws Exception {
-PowerMock.expectPrivate(store, "createProducer")
- .andReturn(producer);
-PowerMock.expectPrivate(store, "createConsumer")
- .andReturn(consumer);
-}
-
-private void expectStart() throws Exception {
+private void expectStart() {
 initializer.run();
-EasyMock.expectLastCall().times(1);
-
-expectProducerAndConsumerCreate();
+verify(initializer, times(1)).run();

Review Comment:
   Also answered in 
https://github.com/apache/kafka/pull/12781#discussion_r1006543034



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush

2022-10-27 Thread GitBox


C0urante commented on PR #12784:
URL: https://github.com/apache/kafka/pull/12784#issuecomment-1293192830

   Thanks @sudeshwasnik, good catch! Would you mind filing a Jira for this so 
that others who encounter the same bug can track its status and fixed versions?
   
   Regarding the actual changes--I think we should expand unit testing coverage 
here. The tweak to `ExactlyOnceWorkerSourceTaskTest` does address the case of 
synchronous offset flush failures (i.e., ones detected before offsets are 
dispatched to the producer), but it drops coverage for the case of asynchronous 
offset flush failures (i.e., ones detected after offsets are dispatched to the 
producer). Can we account for both cases?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] shekhar-rajak commented on pull request #12777: Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest

2022-10-27 Thread GitBox


shekhar-rajak commented on PR #12777:
URL: https://github.com/apache/kafka/pull/12777#issuecomment-1293213497

   @divijvaidya  Please have a look now. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006616025


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -547,32 +538,18 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
 private void setupWithAdmin() {
 Supplier adminSupplier = () -> admin;
 java.util.function.Consumer initializer = admin -> { };
-store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
+store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer);
 }
 
-private void expectProducerAndConsumerCreate() throws Exception {
-PowerMock.expectPrivate(store, "createProducer")
- .andReturn(producer);
-PowerMock.expectPrivate(store, "createConsumer")
- .andReturn(consumer);
-}
-
-private void expectStart() throws Exception {
+private void expectStart() {
 initializer.run();
-EasyMock.expectLastCall().times(1);
-
-expectProducerAndConsumerCreate();
+verify(initializer, times(1)).run();
 }
 
 private void expectStop() {
 producer.close();
-PowerMock.expectLastCall();
+verify(producer, times(1)).close();

Review Comment:
   Okay so you can largely ignore my previous comment, I missed the `replayAll` 
which was being called elsewhere which means we are actually expecting 
`producer.close()` to be called indirectly. I did this quickly and realized 
that I also need to change the `initial` argument for the mock constructor in 
`setUp` (see https://github.com/apache/kafka/pull/12781#discussion_r1006531902)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006616339


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -135,8 +161,7 @@ public class KafkaBasedLogTest {
 @SuppressWarnings("unchecked")
 @Before
 public void setUp() {
-store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, 
initializer);
+store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
() -> null, consumedCallback, time, null);

Review Comment:
   See https://github.com/apache/kafka/pull/12781#discussion_r1006616025 for a 
solution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12789: KAFKA-13989: Errors while evaluating connector type should return UNKNOWN

2022-10-27 Thread GitBox


C0urante commented on code in PR #12789:
URL: https://github.com/apache/kafka/pull/12789#discussion_r1006589610


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -700,6 +707,9 @@ public ConnectorType connectorTypeForClass(String 
connClass) {
  * @return the {@link ConnectorType} of the connector
  */
 public ConnectorType connectorTypeForConfig(Map 
connConfig) {

Review Comment:
   If we remove `connectorTypeForClass`, we can probably just rename this to 
`connectorType`.
   
   Also, the Javadoc states that `connConfig` may not be null. Are there cases 
where we anticipate that it will now be null? If so, we should update the 
Javadoc accordingly; if not, we should remove the null check.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -691,7 +691,14 @@ protected Connector getConnector(String connType) {
  * @param connClass class of the connector
  */
 public ConnectorType connectorTypeForClass(String connClass) {

Review Comment:
   Do we even need this separate method? It looks like it's only ever called in 
tests. Can we inline this logic into `connectorTypeForConfig` and tweak our 
tests to call that method instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12789: KAFKA-13989: Errors while evaluating connector type should return UNKNOWN

2022-10-27 Thread GitBox


C0urante commented on code in PR #12789:
URL: https://github.com/apache/kafka/pull/12789#discussion_r1006588134


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -691,7 +691,14 @@ protected Connector getConnector(String connType) {
  * @param connClass class of the connector
  */
 public ConnectorType connectorTypeForClass(String connClass) {

Review Comment:
   Do we even need this separate method anymore? It looks like it's only ever 
called in tests. Can we inline this logic into `connectorTypeForConfig` and 
tweak our tests to call that method instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006616339


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -135,8 +161,7 @@ public class KafkaBasedLogTest {
 @SuppressWarnings("unchecked")
 @Before
 public void setUp() {
-store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, 
initializer);
+store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
() -> null, consumedCallback, time, null);

Review Comment:
   So I have resolved this in the `Fix expectStart and expectStop along with 
initializer` commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on PR #12781:
URL: https://github.com/apache/kafka/pull/12781#issuecomment-1293252697

   @C0urante @gharris1727 
   
   So now that I understand what the `expectStart` and `expectStop` was meant 
to do, I have both fixed this issue as well as the `initializer` being of the 
wrong type problem in the `Fix expectStart and expectStop along with 
initializer` commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1006635305


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -115,10 +115,32 @@ public class KafkaBasedLogTest {
 private static final String TP1_VALUE_NEW = "VAL1_NEW";
 
 private Time time = new MockTime();
-private KafkaBasedLog store;
+
+private class MockedKafkaBasedLog extends KafkaBasedLog {
+public MockedKafkaBasedLog(String topic,
+   Map producerConfigs,
+   Map consumerConfigs,
+   Supplier topicAdminSupplier,
+   Callback> 
consumedCallback,
+   Time time,
+   Consumer initializer) {
+super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, 
consumedCallback, time, initializer);
+}
+
+@Override
+protected KafkaProducer createProducer() {
+return producer;
+}
+
+@Override
+protected MockConsumer createConsumer() {
+return consumer;
+}
+}
+private MockedKafkaBasedLog store;
 
 @Mock
-private Runnable initializer;
+private Consumer initializer;

Review Comment:
   As pointed out by @gharris1727 , the type of `initializer` had to be changed 
because it was in fact incorrect. I have no idea how this happened to work 
beforehand with EasyMock, but this appears to work alongside with changing 
`expectStart` to `verify(initializer).accept(any());` (since we have a 
`Supplier` and not a `Runnable`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12777: Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest

2022-10-27 Thread GitBox


divijvaidya commented on code in PR #12777:
URL: https://github.com/apache/kafka/pull/12777#discussion_r1006648492


##
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java:
##
@@ -1166,58 +1164,43 @@ public void 
shouldNotThrowInvalidBackwardRangeExceptionWithNegativeFromKey() {
 @Test
 public void shouldCloseCacheAndWrappedStoreAfterErrorDuringCacheFlush() {
 setUpCloseTests();
-EasyMock.reset(cache);
-cache.flush(CACHE_NAMESPACE);
-EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an 
error on flush"));
-cache.close(CACHE_NAMESPACE);
-EasyMock.replay(cache);
-EasyMock.reset(underlyingStore);
-underlyingStore.close();
-EasyMock.replay(underlyingStore);
+doThrow(new RuntimeException(
+"Simulating an error on flush"))
+.when(cache).flush(CACHE_NAMESPACE);
+reset(underlyingStore);
 
 assertThrows(RuntimeException.class, cachingStore::close);
-EasyMock.verify(cache, underlyingStore);
+verifyAndTearDownCloseTests();
 }
 
 @Test
 public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() {
 setUpCloseTests();
-EasyMock.reset(cache);
-cache.flush(CACHE_NAMESPACE);
-cache.close(CACHE_NAMESPACE);
-EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an 
error on close"));
-EasyMock.replay(cache);
-EasyMock.reset(underlyingStore);
-underlyingStore.close();
-EasyMock.replay(underlyingStore);
+doThrow(new RuntimeException("Simulating an error on close"))
+.when(cache).close(CACHE_NAMESPACE);
 
+reset(underlyingStore);

Review Comment:
   do we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2022-10-27 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625003#comment-17625003
 ] 

Luke Chen commented on KAFKA-14016:
---

[~ableegoldman] , thanks for the comment. Yes, we have the logic in sticky 
assignor to protect multiple consumer claiming the same partition in the same 
highest generation. The original thought is that the same logic didn't exist in 
custom assignor. But after 
[KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614]
 get implemented and merged (which should happen in next release v3.4.0), we 
should not worry about it anymore.

Therefore, for this issue, I'd also vote for just reverting the changes.

Thanks.

 

 

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #12791: KAFKA-14338: Use MockTime in RetryUtilTest to eliminate flakiness

2022-10-27 Thread GitBox


C0urante commented on code in PR #12791:
URL: https://github.com/apache/kafka/pull/12791#discussion_r100618


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java:
##
@@ -50,16 +52,15 @@ public void setUp() throws Exception {
 @Test
 public void testSuccess() throws Exception {
 Mockito.when(mockCallable.call()).thenReturn("success");
-assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, Duration.ofMillis(100), 1));
+assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, Duration.ofMillis(100), 1, mockTime));
 Mockito.verify(mockCallable, Mockito.times(1)).call();
 }
 
-// timeout the test after 1000ms if unable to complete within a reasonable 
time frame
-@Test(timeout = 1000)
+@Test
 public void testExhaustingRetries() throws Exception {
 Mockito.when(mockCallable.call()).thenThrow(new TimeoutException());
 ConnectException e = assertThrows(ConnectException.class,

Review Comment:
   Nit: while we're in the neighborhood, can we remove this unnecessary 
variable? Can just change the line to:
   ```java
   assertThrows(ConnectException.class,
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java:
##
@@ -116,16 +117,20 @@ public void testNoBackoffTimeAndSucceed() throws 
Exception {
 .thenThrow(new TimeoutException())
 .thenReturn("success");
 
-assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, TIMEOUT, 0));
+assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, Duration.ofMillis(100), 0, mockTime));
 Mockito.verify(mockCallable, Mockito.times(4)).call();
 }
 
 @Test
 public void testNoBackoffTimeAndFail() throws Exception {
-Mockito.when(mockCallable.call()).thenThrow(new 
TimeoutException("timeout exception"));
+Mockito.when(mockCallable.call()).thenAnswer(invocation -> {
+// Without any backoff time, the speed of the operation itself 
limits the number of retries and retry rate.
+mockTime.sleep(30);
+throw new TimeoutException("timeout exception");
+});

Review Comment:
   We can use the auto-tick feature of the `MockTime` class instead of adding 
in fake calls to `sleep` from our mock object.
   ```suggestion
   // Without backoff time, we never sleep; auto-tick helps move things 
along
   mockTime = new MockTime(10);
   Mockito.when(mockCallable.call()).thenThrow(new 
TimeoutException("timeout exception"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12725: KAFKA-14132; Replace EasyMock with Mockito ConnectorsResourceTest

2022-10-27 Thread GitBox


mdedetrich commented on code in PR #12725:
URL: https://github.com/apache/kafka/pull/12725#discussion_r1006723063


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java:
##
@@ -153,19 +157,23 @@ public class ConnectorsResourceTest {
 @Mock
 private WorkerConfig workerConfig;
 
+private MockedStatic restClientStatic;
+
 @Before
 public void setUp() throws NoSuchMethodException {
-PowerMock.mockStatic(RestClient.class,
-RestClient.class.getMethod("httpRequest", String.class, 
String.class, HttpHeaders.class, Object.class, TypeReference.class, 
WorkerConfig.class));
-
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true);
-
EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true);
-PowerMock.replay(workerConfig);
+restClientStatic = mockStatic(RestClient.class);
+
when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true);
+
when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true);
 connectorsResource = new ConnectorsResource(herder, workerConfig);
-forward = EasyMock.mock(UriInfo.class);
+forward = mock(UriInfo.class);
 MultivaluedMap queryParams = new 
MultivaluedHashMap<>();
 queryParams.putSingle("forward", "true");
-
EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes();
-EasyMock.replay(forward);
+when(forward.getQueryParameters()).thenReturn(queryParams);
+}
+
+@After
+public void teardown() {
+restClientStatic.close();

Review Comment:
   Found the problem and solved in the `Add verifyNoMoreInteractions to 
tearDown` commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush

2022-10-27 Thread GitBox


vamossagar12 commented on code in PR #12784:
URL: https://github.com/apache/kafka/pull/12784#discussion_r1006758803


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,16 +280,18 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+Throwable error = flushError.get();
+if (error == null) {

Review Comment:
   Can this be the else block of 
https://github.com/apache/kafka/pull/12784/files#diff-6073f350e3c1c4ca7334407b32d162012543dad6bcc048cc970f3d11e852e5f2L293?
 Seems like `transactionOpen = false;` is anyways being set in this case. WDYT?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,16 +280,18 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+Throwable error = flushError.get();
+if (error == null) {
+try {
+producer.commitTransaction();
+} catch (Throwable t) {
+log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
+flushError.compareAndSet(null, t);
+}
 }
-
 transactionOpen = false;
 
-Throwable error = flushError.get();
+error = flushError.get();

Review Comment:
   This call is redundant? `error` is already set to `flushError.get()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush

2022-10-27 Thread GitBox


vamossagar12 commented on PR #12784:
URL: https://github.com/apache/kafka/pull/12784#issuecomment-1293401822

   Thanks @sudeshwasnik . This is a nice catch. I have requested couple of 
cosmetic changes. And I agree with Chris, might need to increase the test 
coverage. Currently the only test part of this change is removing an expected 
Call on `FAIL_FLUSH_CALLBACK`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sudeshwasnik commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush

2022-10-27 Thread GitBox


sudeshwasnik commented on code in PR #12784:
URL: https://github.com/apache/kafka/pull/12784#discussion_r1006763859


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,16 +280,18 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+Throwable error = flushError.get();
+if (error == null) {
+try {
+producer.commitTransaction();
+} catch (Throwable t) {
+log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
+flushError.compareAndSet(null, t);
+}
 }
-
 transactionOpen = false;
 
-Throwable error = flushError.get();
+error = flushError.get();

Review Comment:
   So we want to check if producer.commitTransaction itself resulted in error, 
ref - line 289



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,16 +280,18 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+Throwable error = flushError.get();
+if (error == null) {
+try {
+producer.commitTransaction();
+} catch (Throwable t) {
+log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
+flushError.compareAndSet(null, t);
+}
 }
-
 transactionOpen = false;
 
-Throwable error = flushError.get();
+error = flushError.get();

Review Comment:
   we want to check if producer.commitTransaction itself resulted in error, ref 
- line 289



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush

2022-10-27 Thread GitBox


vamossagar12 commented on code in PR #12784:
URL: https://github.com/apache/kafka/pull/12784#discussion_r1006767287


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,16 +280,18 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+Throwable error = flushError.get();
+if (error == null) {
+try {
+producer.commitTransaction();
+} catch (Throwable t) {
+log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
+flushError.compareAndSet(null, t);
+}
 }
-
 transactionOpen = false;
 
-Throwable error = flushError.get();
+error = flushError.get();

Review Comment:
   Oh ok. Now I see that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sudeshwasnik commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush

2022-10-27 Thread GitBox


sudeshwasnik commented on code in PR #12784:
URL: https://github.com/apache/kafka/pull/12784#discussion_r1006768071


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,16 +280,18 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+Throwable error = flushError.get();
+if (error == null) {

Review Comment:
   want to check for 
https://github.com/apache/kafka/pull/12784/files#diff-6073f350e3c1c4ca7334407b32d162012543dad6bcc048cc970f3d11e852e5f2R274
 first here. 
   If this was null, next error could be result of producer.commitTransaction 
itself, so we need to check for that error again. 
   Error resulting from any of those should throw exception... which is what 
i'm intending to do. 
   lmk if i am missing your suggesting 😅 



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,16 +280,18 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+Throwable error = flushError.get();
+if (error == null) {

Review Comment:
   want to check for 
https://github.com/apache/kafka/pull/12784/files#diff-6073f350e3c1c4ca7334407b32d162012543dad6bcc048cc970f3d11e852e5f2R274
 first here. 
   If this was null, next error could be result of producer.commitTransaction 
itself, so we need to check for that error again. 
   Error resulting from any of those should throw exception... which is what 
i'm intending to do. 
   lmk if i am missing your suggestion 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sudeshwasnik commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush

2022-10-27 Thread GitBox


sudeshwasnik commented on code in PR #12784:
URL: https://github.com/apache/kafka/pull/12784#discussion_r1006768071


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,16 +280,18 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+Throwable error = flushError.get();
+if (error == null) {

Review Comment:
   want to check for 
https://github.com/apache/kafka/pull/12784/files#diff-6073f350e3c1c4ca7334407b32d162012543dad6bcc048cc970f3d11e852e5f2R274
 first here. 
   If this was null, next error could be result of producer.commitTransaction 
itself, so we need to check for that error again. 
   Error resulting from any of those should throw exception... that's the 
intention
   lmk if i am missing your suggestion 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #12671: KAFKA-14250: MirrorSourceTask exception causes the task to fail

2022-10-27 Thread GitBox


viktorsomogyi commented on PR #12671:
URL: https://github.com/apache/kafka/pull/12671#issuecomment-1293420143

   @C0urante in fact this was as escalation that our customer brought to us, 
they were using a dedicated MM2 cluster. In the meantime during another case 
with the customer we figured out that 
[KAFKA-9851](https://issues.apache.org/jira/browse/KAFKA-9851) could have been 
the real cause and the fact that MM2 prints an error message is just red 
herring. Regardless, it would be good to improve the error messages to include 
that the kind of failure experienced here won't persist between rebalances. 
What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12784: MINOR: Do not perform prodcuerCommit on serializationError when trying offsetWriter flush

2022-10-27 Thread GitBox


vamossagar12 commented on code in PR #12784:
URL: https://github.com/apache/kafka/pull/12784#discussion_r1006791813


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -280,16 +280,18 @@ private void commitTransaction() {
 
 // Commit the transaction
 // Blocks until all outstanding records have been sent and ack'd
-try {
-producer.commitTransaction();
-} catch (Throwable t) {
-log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-flushError.compareAndSet(null, t);
+Throwable error = flushError.get();
+if (error == null) {

Review Comment:
   Yeah it makes sense now. I missed the part on line #289.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12771: KAFKA-14299: Handle TaskCorruptedException during initialization

2022-10-27 Thread GitBox


ableegoldman commented on code in PR #12771:
URL: https://github.com/apache/kafka/pull/12771#discussion_r1006792420


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -810,10 +810,19 @@ private void transitRestoredTaskToRunning(final Task task,
 }
 
 private void addTasksToStateUpdater() {
+final Map taskExceptions = new 
LinkedHashMap<>();
 for (final Task task : tasks.drainPendingTaskToInit()) {
-task.initializeIfNeeded();
-stateUpdater.add(task);
+try {
+task.initializeIfNeeded();
+stateUpdater.add(task);
+} catch (final RuntimeException e) {
+// need to add task back to the bookkeeping to be handled by 
the stream thread
+tasks.addTask(task);

Review Comment:
   FWIW w.r.t the idea of attaching the entire task instead of just its taskId, 
I agree with Lucas here -- seems risky, would likely end up being abused as 
things progress (by us not the users), but most importantly imo is that it's 
just bad practice. I'm not too well versed on anti patterns I'll admit (perks & 
downsides of being a physics major 😉), but I would think this counts as one (or 
should) -- we shouldn't use exceptions to pass around actual objects rather 
than simple info/metadata. Especially objects of uncertain status, questionable 
thread safety, and significant resources attached. 
   
   Just my two cents as someone who's coming into the restoration thread game 
very late -- given this fact I'm not particularly qualified to weigh in on what 
the _right_ thing to do is here, though again I feel like Lucas's suggestion to 
maintain a/the `TaskRegistry` as a central, source-of-truth type thing for 
tracking and organizing all the tasks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12773: KAFKA-14299: Return emptied ChangelogReader to ACTIVE_RESTORING

2022-10-27 Thread GitBox


ableegoldman commented on code in PR #12773:
URL: https://github.com/apache/kafka/pull/12773#discussion_r1006804068


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -990,6 +991,10 @@ public void unregister(final Collection 
revokedChangelogs) {
 }
 
 removeChangelogsFromRestoreConsumer(revokedInitializedChangelogs);
+
+if (changelogs.isEmpty()) {
+state = ChangelogReaderState.ACTIVE_RESTORING;

Review Comment:
   Apologies if I'm missing something or my understanding of the 
restoration-related code is just completely out of date now, but do I 
understand correctly that this was/is a bug in the current (soon-to-be-old) 
architecture, and not related to the restoration thread work? 
   
   Also, just curious, did you find this bug due to you or someone else hitting 
it, or was it just something you happened to notice wasn't quite right? If 
you/someone is actually running into this "in the wild" it may be a good idea 
to file a jira ticket for it, just to document that the bug exists, we're aware 
of it, and that it's fixed in whichever versions.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -1070,6 +1071,11 @@ public void shouldTransitState() {
 assertEquals(ACTIVE_RESTORING, changelogReader.state());
 assertEquals(mkSet(tp, tp1, tp2), consumer.assignment());
 assertEquals(mkSet(tp1, tp2), consumer.paused());
+

Review Comment:
   Wow, there is a LOT going on in this test...encroaching on 180 lines, yikes. 
Could you split this out into its own test to make it more clear what is being 
tested/that we have coverage for this edge case -- it's hard to get a grasp on 
whether we're actually testing everything when it's all crammed into a single 
test.
   
   I know you didn't write this monolith to begin with, but we should strive 
always to improve on the sins of our ancestors 😉 More seriously though, 
covering multiple things in one test can be useful eg with integration tests 
where the setup & teardown alone take a significant chunk of time. But I would 
imagine/hope a unit test like this runs fairly quickly so it probably makes the 
most sense to optimize for comprehension
   
   Sorry for the slightly pedantic speech, if it's any consolation I bet Bruno 
would have said more or less the same thing lol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


dajac commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1006823247


##
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.server
+
+import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.Admin
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.Properties
+
+class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
+  val numNodes = 2
+  val numParts = 1
+  val initialMessages = 100
+  val nMessages = 100
+
+  val topic = "test-fetch-from-follower"
+  val leaderBrokerId = 0
+  val followerBrokerId = 1
+  var admin: Admin = null
+
+  def overridingProps: Properties = {
+val props = new Properties
+props.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+
+props
+  }
+
+  override def generateConfigs: collection.Seq[KafkaConfig] =
+TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, 
enableControlledShutdown = false, enableFetchFromFollower = true)
+  .map(KafkaConfig.fromProps(_, overridingProps))
+
+  @BeforeEach
+  def initializeFetchFromFollowerCluster(): Unit = {
+// Create a 2 broker cluster where broker 0 is the leader and 1 is the 
follower.
+
+admin = TestUtils.createAdminClient(brokers, listenerName)
+TestUtils.createTopicWithAdminRaw(
+  admin,
+  topic,
+  replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+)
+TestUtils.generateAndProduceMessages(brokers, topic, initialMessages)
+  }
+
+  @AfterEach
+  def close(): Unit = {
+if (admin != null) {
+  admin.close()
+}
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  @Timeout(30)
+  def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit 
= {
+TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
+// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout 
(30 seconds) to ensure that the
+// test only passes when the delayed fetch purgatory is completed after 
successfully replicating from the leader.
+
+val totalMessages = initialMessages + nMessages
+val topicPartition = new TopicPartition(topic, 0)
+val offsetMap = Map[TopicPartition, Long](
+  topicPartition -> (totalMessages - 1)

Review Comment:
   I suppose that the request stays in the purgatory when you fetch offset 200 
because there is not record at offset 200 yet and your don't produce anything 
after the fetch request is sent out.
   
   I think that you test does not really test what you want here. In my mind, 
you should do the following:
   * create the topic with no records.
   * fetch at offset 0 with a high max wait. the request should be placed in 
the purgatory.
   * produce one record.
   * fetch request should be completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14339) producer commits-transaction even if offsetWriter faces serializationError

2022-10-27 Thread Sudesh Wasnik (Jira)
Sudesh Wasnik created KAFKA-14339:
-

 Summary: producer commits-transaction even if offsetWriter faces 
serializationError
 Key: KAFKA-14339
 URL: https://issues.apache.org/jira/browse/KAFKA-14339
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sudesh Wasnik


In ExactlyOnceWorkerSourceTask, producer.commitTransaction is performed even if 
offsetWriter faces a serialization error. 

This leads to no offsets being sent to the producer, but still trying to commit 
the transaction. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


dajac commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1006833685


##
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##
@@ -1100,6 +1100,66 @@ class ReplicaFetcherThreadTest {
 assertEquals(Collections.singletonList(tid1p0), 
fetchRequestBuilder2.removed())
   }
 
+  @Test
+  def testFetchResponseWithPartitionData(): Unit = {
+val appendInfo: LogAppendInfo = mock(classOf[LogAppendInfo])
+verifyLocalFetchCompletionAfterReplication(Some(appendInfo))
+  }
+
+  @Test
+  def testFetchResponseWithNoPartitionData(): Unit = {
+verifyLocalFetchCompletionAfterReplication(appendInfo = None)
+  }
+
+  private def verifyLocalFetchCompletionAfterReplication(appendInfo: 
Option[LogAppendInfo]): Unit = {
+val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+val config = KafkaConfig.fromProps(props)
+
+val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend])
+when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint)
+
+val log: UnifiedLog = mock(classOf[UnifiedLog])
+
+val partition: Partition = mock(classOf[Partition])
+when(partition.localLogOrException).thenReturn(log)
+when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], 
any[Boolean])).thenReturn(appendInfo)
+
+val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+
when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition)
+val brokerTopicStats = new BrokerTopicStats
+when(replicaManager.brokerTopicStats).thenReturn(brokerTopicStats)
+
+val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota])
+
+val thread = createReplicaFetcherThread(
+  name = "replica-fetcher",
+  fetcherId = 0,
+  brokerConfig = config,
+  failedPartitions = failedPartitions,
+  replicaMgr = replicaManager,
+  quota = replicaQuota,
+  leaderEndpointBlockingSend = mockBlockingSend)
+
+val tp0 = new TopicPartition("testTopic", 0)
+val tp1 = new TopicPartition("testTopic", 1)
+val records = MemoryRecords.withRecords(CompressionType.NONE,
+  new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
+val partitionData: thread.FetchData = new FetchResponseData.PartitionData()
+  .setRecords(records)
+
+thread.processPartitionData(tp0, 0, partitionData.setPartitionIndex(0))
+thread.processPartitionData(tp1, 0, partitionData.setPartitionIndex(1))
+thread.doWork()
+appendInfo match {
+  case Some(_) =>
+val argument: ArgumentCaptor[Seq[TopicPartition]] = 
ArgumentCaptor.forClass(classOf[Seq[TopicPartition]])
+verify(replicaManager, 
times(1)).completeDelayedFetchRequests(argument.capture())

Review Comment:
   I am a bit surprised by this. Isn't it possible to do something like 
`verify(replicaManager, times(1)).completeDelayedFetchRequests(Seq())`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


dajac commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1006834496


##
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##
@@ -1100,6 +1100,66 @@ class ReplicaFetcherThreadTest {
 assertEquals(Collections.singletonList(tid1p0), 
fetchRequestBuilder2.removed())
   }
 
+  @Test
+  def testFetchResponseWithPartitionData(): Unit = {
+val appendInfo: LogAppendInfo = mock(classOf[LogAppendInfo])
+verifyLocalFetchCompletionAfterReplication(Some(appendInfo))
+  }
+
+  @Test
+  def testFetchResponseWithNoPartitionData(): Unit = {
+verifyLocalFetchCompletionAfterReplication(appendInfo = None)
+  }
+
+  private def verifyLocalFetchCompletionAfterReplication(appendInfo: 
Option[LogAppendInfo]): Unit = {
+val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+val config = KafkaConfig.fromProps(props)
+
+val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend])
+when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint)
+
+val log: UnifiedLog = mock(classOf[UnifiedLog])
+
+val partition: Partition = mock(classOf[Partition])
+when(partition.localLogOrException).thenReturn(log)
+when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], 
any[Boolean])).thenReturn(appendInfo)
+
+val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+
when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition)
+val brokerTopicStats = new BrokerTopicStats
+when(replicaManager.brokerTopicStats).thenReturn(brokerTopicStats)
+
+val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota])
+
+val thread = createReplicaFetcherThread(
+  name = "replica-fetcher",
+  fetcherId = 0,
+  brokerConfig = config,
+  failedPartitions = failedPartitions,
+  replicaMgr = replicaManager,
+  quota = replicaQuota,
+  leaderEndpointBlockingSend = mockBlockingSend)
+
+val tp0 = new TopicPartition("testTopic", 0)
+val tp1 = new TopicPartition("testTopic", 1)
+val records = MemoryRecords.withRecords(CompressionType.NONE,
+  new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
+val partitionData: thread.FetchData = new FetchResponseData.PartitionData()
+  .setRecords(records)
+
+thread.processPartitionData(tp0, 0, partitionData.setPartitionIndex(0))
+thread.processPartitionData(tp1, 0, partitionData.setPartitionIndex(1))
+thread.doWork()
+appendInfo match {
+  case Some(_) =>
+val argument: ArgumentCaptor[Seq[TopicPartition]] = 
ArgumentCaptor.forClass(classOf[Seq[TopicPartition]])
+verify(replicaManager, 
times(1)).completeDelayedFetchRequests(argument.capture())
+assertEquals(Seq(tp0, tp1), argument.getValue)
+  case None =>
+verify(replicaManager, 
times(0)).completeDelayedFetchRequests(any[Seq[TopicPartition]])

Review Comment:
   nit: Could we verify that the set is actually empty here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


dajac commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1006836805


##
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##
@@ -1100,6 +1100,66 @@ class ReplicaFetcherThreadTest {
 assertEquals(Collections.singletonList(tid1p0), 
fetchRequestBuilder2.removed())
   }
 
+  @Test
+  def testFetchResponseWithPartitionData(): Unit = {
+val appendInfo: LogAppendInfo = mock(classOf[LogAppendInfo])
+verifyLocalFetchCompletionAfterReplication(Some(appendInfo))
+  }
+
+  @Test
+  def testFetchResponseWithNoPartitionData(): Unit = {
+verifyLocalFetchCompletionAfterReplication(appendInfo = None)
+  }
+
+  private def verifyLocalFetchCompletionAfterReplication(appendInfo: 
Option[LogAppendInfo]): Unit = {
+val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+val config = KafkaConfig.fromProps(props)
+
+val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend])
+when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint)
+
+val log: UnifiedLog = mock(classOf[UnifiedLog])
+
+val partition: Partition = mock(classOf[Partition])
+when(partition.localLogOrException).thenReturn(log)
+when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], 
any[Boolean])).thenReturn(appendInfo)
+
+val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+
when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition)
+val brokerTopicStats = new BrokerTopicStats
+when(replicaManager.brokerTopicStats).thenReturn(brokerTopicStats)
+
+val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota])
+
+val thread = createReplicaFetcherThread(
+  name = "replica-fetcher",
+  fetcherId = 0,
+  brokerConfig = config,
+  failedPartitions = failedPartitions,
+  replicaMgr = replicaManager,
+  quota = replicaQuota,
+  leaderEndpointBlockingSend = mockBlockingSend)
+
+val tp0 = new TopicPartition("testTopic", 0)
+val tp1 = new TopicPartition("testTopic", 1)
+val records = MemoryRecords.withRecords(CompressionType.NONE,
+  new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
+val partitionData: thread.FetchData = new FetchResponseData.PartitionData()
+  .setRecords(records)

Review Comment:
   nit: In the case where `appendInfo` is `None`, it is a tad awkward that we 
set records here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12772: KAFKA-14299: Avoid busy polling in state updater

2022-10-27 Thread GitBox


ableegoldman commented on code in PR #12772:
URL: https://github.com/apache/kafka/pull/12772#discussion_r1006838904


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -353,19 +353,21 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
 final Map restoreConsumerConfigs = 
config.getRestoreConsumerConfigs(getRestoreConsumerClientId(threadId));
 final Consumer restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
 
+final boolean stateUpdaterEnabled =
+InternalConfig.getBoolean(config.originals(), 
InternalConfig.STATE_UPDATER_ENABLED, false);
+
 final StoreChangelogReader changelogReader = new StoreChangelogReader(
 time,
 config,

Review Comment:
   nit: we already pass the actual configs into the changelog reader, why not 
set the `stateUpdaterEnabled` flag inside the constructor/StoreChangelogReader 
class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


dajac commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1006845821


##
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##
@@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String,
 
 brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
 
+logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition }

Review Comment:
   I wonder if we could slightly better than this. If you look on the produce 
path, we have the following code:
   
   ```
 result.info.leaderHwChange match {
   case LeaderHwChange.Increased =>
 // some delayed operations may be unblocked after HW changed
 delayedProducePurgatory.checkAndComplete(requestKey)
 delayedFetchPurgatory.checkAndComplete(requestKey)
 delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
   case LeaderHwChange.Same =>
 // probably unblock some follower fetch requests since log end offset 
has been updated
 delayedFetchPurgatory.checkAndComplete(requestKey)
   case LeaderHwChange.None =>
 // nothing
 }
   ```
   
   I wonder if we should do something similar here. Intuitively, we are 
interested by updated HWM and not so by new records, right? What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


dajac commented on PR #12783:
URL: https://github.com/apache/kafka/pull/12783#issuecomment-1293497553

   @jeffkbkim Thanks for the update. I left a few more comments/suggestions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac closed pull request #12674: KAFKA-14255: Fetching from follower should be disallowed if fetch from follower is disabled

2022-10-27 Thread GitBox


dajac closed pull request #12674: KAFKA-14255: Fetching from follower should be 
disallowed if fetch from follower is disabled
URL: https://github.com/apache/kafka/pull/12674


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12790: KAFKA-14337: correctly remove topicsWithCollisionChars after topic deletion

2022-10-27 Thread GitBox


dengziming commented on code in PR #12790:
URL: https://github.com/apache/kafka/pull/12790#discussion_r1006976348


##
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala:
##
@@ -470,6 +471,30 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 TestUtils.verifyTopicDeletion(zkClientOrNull, testTopicName, 1, brokers)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTopicWithCollidingCharDeletionAndCreateAgain(quorum: String): Unit = 
{
+// create the topic with colliding chars
+val topicWithCollidingChar = "test.a"
+val createOpts = new TopicCommandOptions(Array("--partitions", "1",
+  "--replication-factor", "1",
+  "--topic", topicWithCollidingChar))
+createAndWaitTopic(createOpts)
+
+// delete the topic
+val deleteOpts = new TopicCommandOptions(Array("--topic", 
topicWithCollidingChar))
+
+if (!isKRaftTest()) {
+  val deletePath = DeleteTopicsTopicZNode.path(topicWithCollidingChar)
+  assertFalse(zkClient.pathExists(deletePath), "Delete path for topic 
shouldn't exist before deletion.")
+}
+topicService.deleteTopic(deleteOpts)
+TestUtils.verifyTopicDeletion(zkClientOrNull, topicWithCollidingChar, 1, 
brokers)
+
+val createTopic: Executable = () => createAndWaitTopic(createOpts)
+assertDoesNotThrow(createTopic)

Review Comment:
   I'm investigating why there will be compile error when directly using 
`assertDoesNotThrow(() => createAndWaitTopic(createOpts))`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector

2022-10-27 Thread GitBox


C0urante commented on PR #12366:
URL: https://github.com/apache/kafka/pull/12366#issuecomment-1293697763

   Thanks @OmniaGM, good idea. I've updated the README and added an integration 
test that verifies that MM2 can still run with exactly-once support enabled.
   
   I should note that the `testReplication` test case is currently failing 
locally with timeout issues, but succeeds when I bump the timeout. Going to see 
how the Jenkins build goes; we may choose to increase the timeout in these 
tests if they also fail during CI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac closed pull request #12734: KAFKA-14255; Return an empty record instead of an OffsetOutOfRangeException when fetching from a follower without a leader epoch

2022-10-27 Thread GitBox


dajac closed pull request #12734: KAFKA-14255; Return an empty record instead 
of an OffsetOutOfRangeException when fetching from a follower without a leader 
epoch
URL: https://github.com/apache/kafka/pull/12734


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers

2022-10-27 Thread GitBox


mimaison commented on code in PR #12544:
URL: https://github.com/apache/kafka/pull/12544#discussion_r1007088373


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java:
##
@@ -106,6 +108,7 @@ public Connect startConnect(Map 
workerProps) {
 // Create the admin client to be shared by all backing stores.
 Map adminProps = new HashMap<>(config.originals());
 ConnectUtils.addMetricsContextProperties(adminProps, config, 
kafkaClusterId);
+adminProps.putIfAbsent(CLIENT_ID_CONFIG, "connect-cluster-" + 
config.groupId());

Review Comment:
   Yeah the advertised URL is the only identifier we have for a unique worker 
but it could be "ugly" and pretty long. 
   
   Ideally users would provide a unique identifier (like the pod name) as the 
client.id and we would prefix/suffix it with the groupId and a short name like 
`shared-admin`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2022-10-27 Thread Shawn Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625187#comment-17625187
 ] 

Shawn Wang commented on KAFKA-14016:


[~ableegoldman] 

Yes, we experienced KAFKA-13891

Here is the full history of our case:

We have several consumer group which has more than 2000 consumers. And we are 
using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled StaticMembership && 
Coopearative Rebalance to avoid STW time in rebalance.
 # we found that in some cases the partition will be duplicated assigned, so we 
patched KAFKA-12984  , KAFKA-12983,  KAFKA-13406
 # after we deployed online, we found that some consumer group will rebalance 
for a long time ( 2 hours) before it finally get Stable,  so we then patched 
KAFKA-13891.
 # after deployed, we experienced more partition lag when rebalance happens. 
Then i created this issue and try to get some advise.
 # Actually we workaround it by 'ignore the generation value when leader 
calculate assignment' ( just set every memeber's generation to unknonw ).  And 
after we go online for more than 2 months, it looks good now.

 

For "The sticky assignment algorithm should account for this and IIRC will 
basically consider whoever has the highest valid generation for a partition as 
its previous owner", i think in the code does't implement in this way 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127]

Please kindly correct me if i'm wrong. In this code we clear all previous owned 
parittions if we got a higer geneartion, so only the ownedPartitions with 
highest generation will be valid

 

I think "The sticky assignment algorithm should account for this and IIRC will 
basically consider whoever has the highest valid generation for a partition as 
its previous owner" is also a fix for this.

 

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2022-10-27 Thread Shawn Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625187#comment-17625187
 ] 

Shawn Wang edited comment on KAFKA-14016 at 10/27/22 4:08 PM:
--

[~ableegoldman] 

Yes, we experienced KAFKA-13891

Here is the full history of our case:

We have several consumer group which has more than 2000 consumers. And we are 
using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled the build-in 
StaticMembership && Coopearative Rebalance to avoid STW time in rebalance.
 # we found that in some cases the partition will be duplicated assigned, so we 
patched KAFKA-12984  , KAFKA-12983,  KAFKA-13406
 # after we deployed online, we found that some consumer group will rebalance 
for a long time ( 2 hours) before it finally get Stable,  so we then patched 
KAFKA-13891.
 # after deployed, we experienced more partition lag when rebalance happens. 
Then i created this issue and try to get some advise.
 # Actually we workaround it by 'ignore the generation value when leader 
calculate assignment' ( just set every memeber's generation to unknonw ).  And 
after we go online for more than 2 months, it looks good now.

 

For "The sticky assignment algorithm should account for this and IIRC will 
basically consider whoever has the highest valid generation for a partition as 
its previous owner", i think in the code does't implement in this way 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127]

Please kindly correct me if i'm wrong. In this code we clear all previous owned 
parittions if we got a higer geneartion, so only the ownedPartitions with 
highest generation will be valid

 

I think "The sticky assignment algorithm should account for this and IIRC will 
basically consider whoever has the highest valid generation for a partition as 
its previous owner" is also a fix for this.

 


was (Author: JIRAUSER289108):
[~ableegoldman] 

Yes, we experienced KAFKA-13891

Here is the full history of our case:

We have several consumer group which has more than 2000 consumers. And we are 
using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled StaticMembership && 
Coopearative Rebalance to avoid STW time in rebalance.
 # we found that in some cases the partition will be duplicated assigned, so we 
patched KAFKA-12984  , KAFKA-12983,  KAFKA-13406
 # after we deployed online, we found that some consumer group will rebalance 
for a long time ( 2 hours) before it finally get Stable,  so we then patched 
KAFKA-13891.
 # after deployed, we experienced more partition lag when rebalance happens. 
Then i created this issue and try to get some advise.
 # Actually we workaround it by 'ignore the generation value when leader 
calculate assignment' ( just set every memeber's generation to unknonw ).  And 
after we go online for more than 2 months, it looks good now.

 

For "The sticky assignment algorithm should account for this and IIRC will 
basically consider whoever has the highest valid generation for a partition as 
its previous owner", i think in the code does't implement in this way 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127]

Please kindly correct me if i'm wrong. In this code we clear all previous owned 
parittions if we got a higer geneartion, so only the ownedPartitions with 
highest generation will be valid

 

I think "The sticky assignment algorithm should account for this and IIRC will 
basically consider whoever has the highest valid generation for a partition as 
its previous owner" is also a fix for this.

 

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # 

[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2022-10-27 Thread Shawn Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625192#comment-17625192
 ] 

Shawn Wang commented on KAFKA-14016:


I also vote for revertingKAFKA-13891 since that previous state in 3 happens 
randomly and doesn't have a big impact.

But i don't think  
[KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614]
 can solve this problem. It moved generationId from userData to 
ConsumerProtocolSubscription and don't change the logic of build-in 
CooperativeStickyAssignor.

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2022-10-27 Thread Shawn Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625192#comment-17625192
 ] 

Shawn Wang edited comment on KAFKA-14016 at 10/27/22 4:13 PM:
--

I also vote for reverting KAFKA-13891 since the case happens randomly and 
doesn't have a big impact.

But i don't think  
[KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614]
 can solve this problem. It moved generationId from userData to 
ConsumerProtocolSubscription and don't change the logic of build-in 
CooperativeStickyAssignor.


was (Author: JIRAUSER289108):
I also vote for revertingKAFKA-13891 since that previous state in 3 happens 
randomly and doesn't have a big impact.

But i don't think  
[KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614]
 can solve this problem. It moved generationId from userData to 
ConsumerProtocolSubscription and don't change the logic of build-in 
CooperativeStickyAssignor.

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2022-10-27 Thread Shawn Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625192#comment-17625192
 ] 

Shawn Wang edited comment on KAFKA-14016 at 10/27/22 4:13 PM:
--

I also vote for reverting KAFKA-13891 since the case happens randomly and 
doesn't have a big impact.

But i don't think  
[KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614]
 can solve this problem. It moved generationId from userData to 
ConsumerProtocolSubscription and didn't change the logic of build-in 
CooperativeStickyAssignor.


was (Author: JIRAUSER289108):
I also vote for reverting KAFKA-13891 since the case happens randomly and 
doesn't have a big impact.

But i don't think  
[KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614]
 can solve this problem. It moved generationId from userData to 
ConsumerProtocolSubscription and don't change the logic of build-in 
CooperativeStickyAssignor.

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rhauch merged pull request #12621: MINOR: Migrate connect system tests to KRaft

2022-10-27 Thread GitBox


rhauch merged PR #12621:
URL: https://github.com/apache/kafka/pull/12621


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2022-10-27 Thread Shawn Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625187#comment-17625187
 ] 

Shawn Wang edited comment on KAFKA-14016 at 10/27/22 4:20 PM:
--

[~ableegoldman] 

Yes, we experienced KAFKA-13891

Here is the full history of our case:

We have several consumer group which has more than 2000 consumers. And we are 
using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled the build-in 
StaticMembership && Coopearative Rebalance to avoid STW time in rebalance.
 # we found that in some cases the partition will be duplicated assigned, so we 
patched KAFKA-12984  , KAFKA-12983,  KAFKA-13406
 # after we deployed online, we found that some consumer group will rebalance 
for a long time ( 2 hours) before it finally get Stable,  so we then patched 
KAFKA-13891.
 # after deployed, we experienced more partition lag when rebalance happens. 
Then i created this issue and try to get some advise.
 # We finally workaround it by 'ignore the generation value when leader 
calculate assignment' ( just set every memeber's generation to unknonw )  && 
revert KAFKA-13891.  And after we go online for more than 2 months, it looks 
good now.

 

For "The sticky assignment algorithm should account for this and IIRC will 
basically consider whoever has the highest valid generation for a partition as 
its previous owner", i think in the code does't implement in this way 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127]

Please kindly correct me if i'm wrong. In this code we clear all previous owned 
parittions if we got a higer geneartion, so only the ownedPartitions with 
highest generation will be valid

 

I think "The sticky assignment algorithm should account for this and IIRC will 
basically consider whoever has the highest valid generation for a partition as 
its previous owner" is also a fix for this.

 


was (Author: JIRAUSER289108):
[~ableegoldman] 

Yes, we experienced KAFKA-13891

Here is the full history of our case:

We have several consumer group which has more than 2000 consumers. And we are 
using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled the build-in 
StaticMembership && Coopearative Rebalance to avoid STW time in rebalance.
 # we found that in some cases the partition will be duplicated assigned, so we 
patched KAFKA-12984  , KAFKA-12983,  KAFKA-13406
 # after we deployed online, we found that some consumer group will rebalance 
for a long time ( 2 hours) before it finally get Stable,  so we then patched 
KAFKA-13891.
 # after deployed, we experienced more partition lag when rebalance happens. 
Then i created this issue and try to get some advise.
 # Actually we workaround it by 'ignore the generation value when leader 
calculate assignment' ( just set every memeber's generation to unknonw ).  And 
after we go online for more than 2 months, it looks good now.

 

For "The sticky assignment algorithm should account for this and IIRC will 
basically consider whoever has the highest valid generation for a partition as 
its previous owner", i think in the code does't implement in this way 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127]

Please kindly correct me if i'm wrong. In this code we clear all previous owned 
parittions if we got a higer geneartion, so only the ownedPartitions with 
highest generation will be valid

 

I think "The sticky assignment algorithm should account for this and IIRC will 
basically consider whoever has the highest valid generation for a partition as 
its previous owner" is also a fix for this.

 

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their part

[GitHub] [kafka] rhauch commented on pull request #12621: MINOR: Migrate connect system tests to KRaft

2022-10-27 Thread GitBox


rhauch commented on PR #12621:
URL: https://github.com/apache/kafka/pull/12621#issuecomment-1293774760

   Merged to the `trunk` branch and cherry-picked to the `3.3` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


mimaison commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007096564


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java:
##
@@ -0,0 +1,117 @@
+package org.apache.kafka.connect.mirror.clients.admin;

Review Comment:
   This is missing the license



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java:
##
@@ -0,0 +1,15 @@
+package org.apache.kafka.connect.mirror.clients.admin;

Review Comment:
   This is missing the license



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAd

[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


dajac commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007133812


##
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##
@@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String,
 
 brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
 
+logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition }

Review Comment:
   Note that the HWM is updated separately here: 
`updateHighWatermarkAndStartOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #12769: KAFKA-14314: Add check for null upstreamTopic

2022-10-27 Thread GitBox


mimaison commented on code in PR #12769:
URL: https://github.com/apache/kafka/pull/12769#discussion_r1007143335


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -319,4 +320,17 @@ public void testRefreshTopicPartitionsTopicOnTargetFirst() 
throws Exception {
 connector.refreshTopicPartitions();
 verify(connector, times(1)).computeAndCreateTopicPartitions();
 }
+
+@Test
+public void testIsCycleWithNullUpstreamTopic() throws Exception {

Review Comment:
   I think we can drop `throws Exception`



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -319,4 +320,17 @@ public void testRefreshTopicPartitionsTopicOnTargetFirst() 
throws Exception {
 connector.refreshTopicPartitions();
 verify(connector, times(1)).computeAndCreateTopicPartitions();
 }
+
+@Test
+public void testIsCycleWithNullUpstreamTopic() throws Exception {
+class BadReplicationPolicy extends DefaultReplicationPolicy {

Review Comment:
   What about `CustomReplicationPolicy` as it's not really a bad policy, `null` 
is allowed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


dajac commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007153544


##
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##
@@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String,
 
 brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
 
+logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition }

Review Comment:
   I am actually not sure about this. I need to think about it a little more. 
Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #12792: [Test]gradle oom fix

2022-10-27 Thread GitBox


philipnee commented on PR #12792:
URL: https://github.com/apache/kafka/pull/12792#issuecomment-1293833011

   
[0c166c0](https://github.com/apache/kafka/pull/12792/commits/0c166c0cc456619b4168736820e05b06dc7ae7ab)
 - no gradle oom


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee closed pull request #12038: [WIP] KAFKA-13421

2022-10-27 Thread GitBox


philipnee closed pull request #12038: [WIP] KAFKA-13421
URL: https://github.com/apache/kafka/pull/12038


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-27 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007181238


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -618,7 +618,8 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
  * does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
  * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
- * transactional.id is not authorized. See the exception for more 
details
+ * transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for

Review Comment:
   One could get Authorization issue upon requesting for a producerId, not 
necessarily caused by an unavailable broker. Yes, maybe remove the word "fatal" 
thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-27 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007185990


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   It's one way (that I could think of) to propagate the authorization error 
from the sender loop; otherwise, the sender will continue to retry and causes 
timeout in some cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-27 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007187187


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
 }
 }
 
+@Test
+public void testClusterAuthorizationFailure() throws Exception {
+int maxBlockMs = 500;
+
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+Producer producer = kafkaProducer(configs, new 
StringSerializer(),
+new StringSerializer(), metadata, client, null, time);
+assertThrows(ClusterAuthorizationException.class, 
producer::initTransactions);
+
+// retry initTransactions after the ClusterAuthorizationException not 
being thrown
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+TestUtils.retryOnExceptionWithTimeout(1000, 100, 
producer::initTransactions);

Review Comment:
   yes!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12790: KAFKA-14337: correctly remove topicsWithCollisionChars after topic deletion

2022-10-27 Thread GitBox


hachikuji commented on PR #12790:
URL: https://github.com/apache/kafka/pull/12790#issuecomment-1293873487

   Great find! Agree with @dengziming that we should have a test in 
`ReplicationControlManagerTest`. Otherwise, LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2022-10-27 Thread GitBox


bbejeck commented on PR #12465:
URL: https://github.com/apache/kafka/pull/12465#issuecomment-1293885171

   Failures unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


jeffkbkim commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007204668


##
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##
@@ -1100,6 +1100,66 @@ class ReplicaFetcherThreadTest {
 assertEquals(Collections.singletonList(tid1p0), 
fetchRequestBuilder2.removed())
   }
 
+  @Test
+  def testFetchResponseWithPartitionData(): Unit = {
+val appendInfo: LogAppendInfo = mock(classOf[LogAppendInfo])
+verifyLocalFetchCompletionAfterReplication(Some(appendInfo))
+  }
+
+  @Test
+  def testFetchResponseWithNoPartitionData(): Unit = {
+verifyLocalFetchCompletionAfterReplication(appendInfo = None)
+  }
+
+  private def verifyLocalFetchCompletionAfterReplication(appendInfo: 
Option[LogAppendInfo]): Unit = {
+val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+val config = KafkaConfig.fromProps(props)
+
+val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend])
+when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint)
+
+val log: UnifiedLog = mock(classOf[UnifiedLog])
+
+val partition: Partition = mock(classOf[Partition])
+when(partition.localLogOrException).thenReturn(log)
+when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], 
any[Boolean])).thenReturn(appendInfo)
+
+val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+
when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition)
+val brokerTopicStats = new BrokerTopicStats
+when(replicaManager.brokerTopicStats).thenReturn(brokerTopicStats)
+
+val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota])
+
+val thread = createReplicaFetcherThread(
+  name = "replica-fetcher",
+  fetcherId = 0,
+  brokerConfig = config,
+  failedPartitions = failedPartitions,
+  replicaMgr = replicaManager,
+  quota = replicaQuota,
+  leaderEndpointBlockingSend = mockBlockingSend)
+
+val tp0 = new TopicPartition("testTopic", 0)
+val tp1 = new TopicPartition("testTopic", 1)
+val records = MemoryRecords.withRecords(CompressionType.NONE,
+  new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
+val partitionData: thread.FetchData = new FetchResponseData.PartitionData()
+  .setRecords(records)
+
+thread.processPartitionData(tp0, 0, partitionData.setPartitionIndex(0))
+thread.processPartitionData(tp1, 0, partitionData.setPartitionIndex(1))
+thread.doWork()
+appendInfo match {
+  case Some(_) =>
+val argument: ArgumentCaptor[Seq[TopicPartition]] = 
ArgumentCaptor.forClass(classOf[Seq[TopicPartition]])
+verify(replicaManager, 
times(1)).completeDelayedFetchRequests(argument.capture())
+assertEquals(Seq(tp0, tp1), argument.getValue)
+  case None =>
+verify(replicaManager, 
times(0)).completeDelayedFetchRequests(any[Seq[TopicPartition]])

Review Comment:
   i'm not sure i follow. replicaManager.completeDelayedFetchRequests is not 
called when appendInfo is empty. i will add a check for both cases to assert 
that the underlying buffer is empty after thread.doWork



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-27 Thread GitBox


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007207887


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Hmmm. I see. I'm just wondering if we are hijacking the current state 
machine in an unexpected way and if there are implications there. I suppose we 
are only following this path on these specific error types, but I wonder if we 
are missing anything existing by changing the valid transitions and/or opening 
up the potential for something else in the future. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck merged pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2022-10-27 Thread GitBox


bbejeck merged PR #12465:
URL: https://github.com/apache/kafka/pull/12465


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2022-10-27 Thread GitBox


bbejeck commented on PR #12465:
URL: https://github.com/apache/kafka/pull/12465#issuecomment-1293889869

   Merged #12465 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2022-10-27 Thread GitBox


bbejeck commented on PR #12465:
URL: https://github.com/apache/kafka/pull/12465#issuecomment-1293890128

   Thanks for the contribution @divijvaidya !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


jeffkbkim commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007215458


##
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##
@@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String,
 
 brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
 
+logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition }

Review Comment:
   i think this makes sense. consumer fetches will at most fetch up to the HWM 
so we can add the topic partition only when `LeaderHwChange.Increased`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12950) Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2022-10-27 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625246#comment-17625246
 ] 

Bill Bejeck commented on KAFKA-12950:
-

Merged into trunk

> Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest
> 
>
> Key: KAFKA-12950
> URL: https://issues.apache.org/jira/browse/KAFKA-12950
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Josep Prat
>Assignee: Divij Vaidya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14337) topic name with "." cannot be created after deletion

2022-10-27 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14337:
---
Fix Version/s: 3.4.0
   3.3.2

> topic name with "." cannot be created after deletion
> 
>
> Key: KAFKA-14337
> URL: https://issues.apache.org/jira/browse/KAFKA-14337
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: thanhnd96
>Assignee: Luke Chen
>Priority: Critical
> Fix For: 3.4.0, 3.3.2
>
>
> Hi admin,
> My issue is after i create topic like topic.AAA or Topic.AAA.01 then delete 1 
> of the other 2 topics.
> Then i can't create 1 of the 2 topics.
> But i create topic test123 then delete and recreate fine.
> This is log i tried to create topic.AAA
> WARN [Controller 1] createTopics: failed with unknown server exception 
> NoSuchElementException at epoch 14 in 193 us.  Renouncing leadership and 
> reverting to the last committed offset 28. 
> (org.apache.kafka.controller.QuorumController)
> java.util.NoSuchElementException
>         at 
> org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:167)
>         at 
> org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:139)
>         at 
> org.apache.kafka.timeline.TimelineHashSet$ValueIterator.next(TimelineHashSet.java:120)
>         at 
> org.apache.kafka.controller.ReplicationControlManager.validateNewTopicNames(ReplicationControlManager.java:799)
>         at 
> org.apache.kafka.controller.ReplicationControlManager.createTopics(ReplicationControlManager.java:567)
>         at 
> org.apache.kafka.controller.QuorumController.lambda$createTopics$7(QuorumController.java:1832)
>         at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:767)
>         at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>         at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>         at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> ERROR [Controller 1] processBrokerHeartbeat: unable to start processing 
> because of NotControllerException. 
> (org.apache.kafka.controller.QuorumController)
>  
> I'm run kafka mode Kraft !!!
> Tks admin.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


dajac commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007243383


##
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##
@@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String,
 
 brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
 
+logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition }

Review Comment:
   Yeah, that's right. We just need to ensure that the HWM is always <= log end 
offset. Otherwise, this does not work. I will read this code again tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


dajac commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007243845


##
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##
@@ -1100,6 +1100,66 @@ class ReplicaFetcherThreadTest {
 assertEquals(Collections.singletonList(tid1p0), 
fetchRequestBuilder2.removed())
   }
 
+  @Test
+  def testFetchResponseWithPartitionData(): Unit = {
+val appendInfo: LogAppendInfo = mock(classOf[LogAppendInfo])
+verifyLocalFetchCompletionAfterReplication(Some(appendInfo))
+  }
+
+  @Test
+  def testFetchResponseWithNoPartitionData(): Unit = {
+verifyLocalFetchCompletionAfterReplication(appendInfo = None)
+  }
+
+  private def verifyLocalFetchCompletionAfterReplication(appendInfo: 
Option[LogAppendInfo]): Unit = {
+val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+val config = KafkaConfig.fromProps(props)
+
+val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend])
+when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint)
+
+val log: UnifiedLog = mock(classOf[UnifiedLog])
+
+val partition: Partition = mock(classOf[Partition])
+when(partition.localLogOrException).thenReturn(log)
+when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], 
any[Boolean])).thenReturn(appendInfo)
+
+val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+
when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition)
+val brokerTopicStats = new BrokerTopicStats
+when(replicaManager.brokerTopicStats).thenReturn(brokerTopicStats)
+
+val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota])
+
+val thread = createReplicaFetcherThread(
+  name = "replica-fetcher",
+  fetcherId = 0,
+  brokerConfig = config,
+  failedPartitions = failedPartitions,
+  replicaMgr = replicaManager,
+  quota = replicaQuota,
+  leaderEndpointBlockingSend = mockBlockingSend)
+
+val tp0 = new TopicPartition("testTopic", 0)
+val tp1 = new TopicPartition("testTopic", 1)
+val records = MemoryRecords.withRecords(CompressionType.NONE,
+  new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
+val partitionData: thread.FetchData = new FetchResponseData.PartitionData()
+  .setRecords(records)
+
+thread.processPartitionData(tp0, 0, partitionData.setPartitionIndex(0))
+thread.processPartitionData(tp1, 0, partitionData.setPartitionIndex(1))
+thread.doWork()
+appendInfo match {
+  case Some(_) =>
+val argument: ArgumentCaptor[Seq[TopicPartition]] = 
ArgumentCaptor.forClass(classOf[Seq[TopicPartition]])
+verify(replicaManager, 
times(1)).completeDelayedFetchRequests(argument.capture())
+assertEquals(Seq(tp0, tp1), argument.getValue)
+  case None =>
+verify(replicaManager, 
times(0)).completeDelayedFetchRequests(any[Seq[TopicPartition]])

Review Comment:
   ah, right. i missed the `times(0)`. my bad.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12793: [KAFKA-14328]: KafkaAdminClient should be Changing the exception level …

2022-10-27 Thread GitBox


guozhangwang commented on code in PR #12793:
URL: https://github.com/apache/kafka/pull/12793#discussion_r1007251372


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -835,6 +836,10 @@ final void fail(long now, Throwable throwable) {
 log.debug("{} failed: {}. Beginning retry #{}",
 this, prettyPrintException(throwable), tries);
 }
+//Temporarily save the last exception of the call,

Review Comment:
   nit: space after `//`.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -944,7 +949,7 @@ int handleTimeouts(Collection calls, String msg) {
 Call call = iter.next();
 int remainingMs = calcTimeoutMsRemainingAsInt(now, 
call.deadlineMs);
 if (remainingMs < 0) {
-call.fail(now, new TimeoutException(msg + " Call: " + 
call.callName));
+call.fail(now, new TimeoutException(msg + " Call: " + 
call.callName,call.lastThrowable));

Review Comment:
   I think it's better to clarify it as "the last error causing retry is..", 
also nit: space after `,`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14328) KafkaAdminClient should be Changing the exception level When an exception occurs

2022-10-27 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625256#comment-17625256
 ] 

Guozhang Wang commented on KAFKA-14328:
---

Hello [~shizhenzhen] sorry for the late reply. I was OOO for a while.

I reviewed the ticket and the latest PR, and I think your proposed change makes 
sense, it's less intrusive than logging a warn on each retry, since people 
would concern it flood the log entries. I left some comments on the PR.

> KafkaAdminClient should be Changing the exception level When an exception 
> occurs
> 
>
> Key: KAFKA-14328
> URL: https://issues.apache.org/jira/browse/KAFKA-14328
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 3.3
>Reporter: shizhenzhen
>Priority: Major
> Attachments: image-2022-10-21-11-19-21-064.png, 
> image-2022-10-21-14-56-31-753.png, image-2022-10-21-16-54-40-588.png, 
> image-2022-10-21-16-56-45-448.png, image-2022-10-21-16-58-19-353.png, 
> image-2022-10-24-14-28-10-365.png, image-2022-10-24-14-47-30-641.png, 
> image-2022-10-24-14-48-27-907.png
>
>
>  
>  
> KafkaAdminClient 的一些日志全部是 log.trace.  当遇到异常的时候根本不知道什么原因,导致排查问题非常艰难。
>  
> 就比如下面这里,当去请求Metadata请求的时候,如果查询到的Topic有分区Leader=-1的时候,就会抛出异常;
>  
> 但是这个时候实际上异常是被吞掉了的,这里往上面抛出异常之后,到了下面第二张图的 Catch部分。
> 他会把这个请求重新放到到请求队列中。然后就会陷入无限读重试之后,直到达到超时时间抛出异常:Timed out waiting for a node 
> assignment. Call: metadata
>  
> 无法给Metadata请求分配节点,正常情况下谁知道他真正的异常其实是
>  
> ```
> org.apache.kafka.common.errors.LeaderNotAvailableException: There is no 
> leader for this topic-partition as we are in the middle of a leadership 
> election.
>  
> ```
>  
>  
>  
>  
> !https://user-images.githubusercontent.com/10442648/196944422-e11b732f-6f7f-4f77-8d9c-1f0544257461.png!
>  
>  
>  
> 下面截图那里是我改成的warn基本的日志
> !image-2022-10-21-11-19-21-064.png!
>  
> 所以我希望这里的log.trace 能改成 log.warn ; 给一个提醒。
> 就可以说明当前因为某个异常的原因而导致可能的重试。
>  
>  
> 
>  
>  
> !image-2022-10-21-14-56-31-753.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


jeffkbkim commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007253784


##
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.server
+
+import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.Admin
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.Properties
+
+class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
+  val numNodes = 2
+  val numParts = 1
+  val initialMessages = 100
+  val nMessages = 100
+
+  val topic = "test-fetch-from-follower"
+  val leaderBrokerId = 0
+  val followerBrokerId = 1
+  var admin: Admin = null
+
+  def overridingProps: Properties = {
+val props = new Properties
+props.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+
+props
+  }
+
+  override def generateConfigs: collection.Seq[KafkaConfig] =
+TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, 
enableControlledShutdown = false, enableFetchFromFollower = true)
+  .map(KafkaConfig.fromProps(_, overridingProps))
+
+  @BeforeEach
+  def initializeFetchFromFollowerCluster(): Unit = {
+// Create a 2 broker cluster where broker 0 is the leader and 1 is the 
follower.
+
+admin = TestUtils.createAdminClient(brokers, listenerName)
+TestUtils.createTopicWithAdminRaw(
+  admin,
+  topic,
+  replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+)
+TestUtils.generateAndProduceMessages(brokers, topic, initialMessages)
+  }
+
+  @AfterEach
+  def close(): Unit = {
+if (admin != null) {
+  admin.close()
+}
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  @Timeout(30)
+  def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit 
= {
+TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
+// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout 
(30 seconds) to ensure that the
+// test only passes when the delayed fetch purgatory is completed after 
successfully replicating from the leader.
+
+val totalMessages = initialMessages + nMessages
+val topicPartition = new TopicPartition(topic, 0)
+val offsetMap = Map[TopicPartition, Long](
+  topicPartition -> (totalMessages - 1)

Review Comment:
   the suggestion makes sense. i think the point you're getting at is we need 
to have the fetch sit in purgatory before replication. and i confirmed your 
suggestion works locally.
   
   however, i'm still unable to grasp why the current logic fails. let's say 
"there is no record at offset 200 yet" which means replication is not complete. 
the fetch request will sit in the purgatory and wake up once the record is 
replicated.
   
   could it be when the record at offset 200 is replicated, the follower's HWM 
is not updated yet to 200 so the fetch request sits in purgatory and never 
wakes up because there are no new records?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


jeffkbkim commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007253784


##
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.server
+
+import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.Admin
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.Properties
+
+class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
+  val numNodes = 2
+  val numParts = 1
+  val initialMessages = 100
+  val nMessages = 100
+
+  val topic = "test-fetch-from-follower"
+  val leaderBrokerId = 0
+  val followerBrokerId = 1
+  var admin: Admin = null
+
+  def overridingProps: Properties = {
+val props = new Properties
+props.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+
+props
+  }
+
+  override def generateConfigs: collection.Seq[KafkaConfig] =
+TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, 
enableControlledShutdown = false, enableFetchFromFollower = true)
+  .map(KafkaConfig.fromProps(_, overridingProps))
+
+  @BeforeEach
+  def initializeFetchFromFollowerCluster(): Unit = {
+// Create a 2 broker cluster where broker 0 is the leader and 1 is the 
follower.
+
+admin = TestUtils.createAdminClient(brokers, listenerName)
+TestUtils.createTopicWithAdminRaw(
+  admin,
+  topic,
+  replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+)
+TestUtils.generateAndProduceMessages(brokers, topic, initialMessages)
+  }
+
+  @AfterEach
+  def close(): Unit = {
+if (admin != null) {
+  admin.close()
+}
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  @Timeout(30)
+  def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit 
= {
+TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
+// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout 
(30 seconds) to ensure that the
+// test only passes when the delayed fetch purgatory is completed after 
successfully replicating from the leader.
+
+val totalMessages = initialMessages + nMessages
+val topicPartition = new TopicPartition(topic, 0)
+val offsetMap = Map[TopicPartition, Long](
+  topicPartition -> (totalMessages - 1)

Review Comment:
   the suggestion makes sense. i think the point you're getting at is we need 
to have the fetch sit in purgatory before replication. and i confirmed your 
suggestion works locally.
   
   however, i'm still unable to grasp why the current logic fails. let's say 
"there is no record at offset 200 yet" which means replication is not complete. 
the fetch request will sit in the purgatory and wake up once the record is 
replicated.
   
   could it be the case where record at offset 200 is already replicated but 
the follower's HWM is not updated yet to 200 so the fetch request sits in 
purgatory and never wakes up because there are no new records?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12793: [KAFKA-14328]: KafkaAdminClient should be Changing the exception level …

2022-10-27 Thread GitBox


guozhangwang commented on PR #12793:
URL: https://github.com/apache/kafka/pull/12793#issuecomment-1293946483

   @shirenchuang the checkstyle failures are related to the coding styles in 
the PR, you can run the commands locally (there's instructions in `README.md`) 
to check again when you update the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cowlike commented on a diff in pull request #12769: KAFKA-14314: Add check for null upstreamTopic

2022-10-27 Thread GitBox


cowlike commented on code in PR #12769:
URL: https://github.com/apache/kafka/pull/12769#discussion_r1007313398


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -319,4 +320,17 @@ public void testRefreshTopicPartitionsTopicOnTargetFirst() 
throws Exception {
 connector.refreshTopicPartitions();
 verify(connector, times(1)).computeAndCreateTopicPartitions();
 }
+
+@Test
+public void testIsCycleWithNullUpstreamTopic() throws Exception {
+class BadReplicationPolicy extends DefaultReplicationPolicy {

Review Comment:
   > What about `CustomReplicationPolicy` as it's not really a bad policy, 
`null` is allowed
   
   `CustomReplicationPolicy` will not fail the test since it ends up return 
`false` after the first line of `isCycle` returns null:
   ```java
   boolean isCycle(String topic) {
   String source = replicationPolicy.topicSource(topic);
   //...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cowlike commented on a diff in pull request #12769: KAFKA-14314: Add check for null upstreamTopic

2022-10-27 Thread GitBox


cowlike commented on code in PR #12769:
URL: https://github.com/apache/kafka/pull/12769#discussion_r1007313398


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -319,4 +320,17 @@ public void testRefreshTopicPartitionsTopicOnTargetFirst() 
throws Exception {
 connector.refreshTopicPartitions();
 verify(connector, times(1)).computeAndCreateTopicPartitions();
 }
+
+@Test
+public void testIsCycleWithNullUpstreamTopic() throws Exception {
+class BadReplicationPolicy extends DefaultReplicationPolicy {

Review Comment:
   > What about `CustomReplicationPolicy` as it's not really a bad policy, 
`null` is allowed
   
   `CustomReplicationPolicy` will not fail the test since it ends up returning 
`false` after the first line of `isCycle` returns null:
   ```java
   boolean isCycle(String topic) {
   String source = replicationPolicy.topicSource(topic);
   //...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007411497


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java:
##
@@ -0,0 +1,15 @@
+package org.apache.kafka.connect.mirror.clients.admin;

Review Comment:
   Added the license 



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java:
##
@@ -0,0 +1,117 @@
+package org.apache.kafka.connect.mirror.clients.admin;

Review Comment:
   Added the license



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007412233


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,596 @@
+/*

Review Comment:
   Fixed the checkstyle `./gradlew checkstyleMain checkstyleTest` should now be 
green



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007412233


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,596 @@
+/*

Review Comment:
   Fixed the checkstyle `./gradlew checkstyleMain checkstyleTest` should now 
successed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007424805


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAdmin to create/alter topics, 
partitions and ACLs.
+ */
+@Tag("integration")
+public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest {
+protected static final int NUM_RECORDS_PER_PARTITION = 1;
+protected static final int NUM_PARTITIONS = 1;
+protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+protected static final Duration CONSUMER_POLL_TIMEOUT_MS = 
Duration.ofMillis(500L);
+protected static final String PRIMARY_CLUSTER_ALIAS = "pr

[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007434322


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAdmin to create/alter topics, 
partitions and ACLs.
+ */
+@Tag("integration")
+public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest {
+protected static final int NUM_RECORDS_PER_PARTITION = 1;
+protected static final int NUM_PARTITIONS = 1;
+protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+protected static final Duration CONSUMER_POLL_TIMEOUT_MS = 
Duration.ofMillis(500L);
+protected static final String PRIMARY_CLUSTER_ALIAS = "pr

[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2022-10-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625360#comment-17625360
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14016:


[~aiquestion] would you be interested in submitting a PR for reverting  
KAFKA-13891? It still needs a PR but Luke or I can merge it as soon as the PR 
build finishes running. In the meantime I'll look into the assignor's handling 
of this kind of thing and submit a patch if we're missing the logic for this 
(which from your report, it sounds like we are)

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Scanteianu commented on pull request #12753: MINOR: Document Offset and Partition 0-indexing, fix typo

2022-10-27 Thread GitBox


Scanteianu commented on PR #12753:
URL: https://github.com/apache/kafka/pull/12753#issuecomment-1294252160

   > @Scanteianu , thanks for the update, but there is still something wrong. 
For the test, I don't think another tests in MockConsumer is needed and 
correct. After all, MockConsumer is a `Mock` one, some behaviors doesn't match 
the real consumer. For the seek test, we already have one here: 
`PlaintextConsumerTest#testSeek`. If you think there are some tests missing, 
you can add in the test.
   > 
   > BTW, `PlaintextConsumerTest` is integration tests, so it'll launch real 
brokers and consumers to do tests. You can test it there.
   > 
   > Thanks.
   
   @showuon thanks for the review, I really appreciate your taking the time to 
look at it, and point me to the right places in the code to add tests and the 
configs that specify behaviour!
   
   I have worked with people who believe mocks should be as close to the 
behaviour of the "real deal" as possible (and am partial to this opinion 
myself), but I'll maybe open another PR with that, as I think that is more of a 
stylistic opinion (whether Apache has this codified somewhere, I don't know), 
and I'd really like to improve the seek documentation, as I spent quite a few 
hours googling for this stuff. 
   
   For now, I've moved the tests to the Plaintext Consumer, and added tests for 
the various auto.offset.reset configs. The behaviours in latest seem super 
weird to me, based on your explanation - they don't seem to match 
https://medium.com/lydtech-consulting/kafka-consumer-auto-offset-reset-d3962bad2665
 specifically for the `latest` value. 
   
   In the case of `latest`, if the next batch of records contains the offset 
that is passed to seek, it seems to skip to that offset, which is the same 
behaviour that the mock consumer had, which kind of makes sense. However, if 
the next batch of records doesn't contain the offset last passed to seek, it 
resets to the very beginning (offset 0). Either I am missing something, or this 
might be a bug?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


jeffkbkim commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007536584


##
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.server
+
+import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.Admin
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.Properties
+
+class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
+  val numNodes = 2
+  val numParts = 1
+  val initialMessages = 100
+  val nMessages = 100
+
+  val topic = "test-fetch-from-follower"
+  val leaderBrokerId = 0
+  val followerBrokerId = 1
+  var admin: Admin = null
+
+  def overridingProps: Properties = {
+val props = new Properties
+props.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+
+props
+  }
+
+  override def generateConfigs: collection.Seq[KafkaConfig] =
+TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, 
enableControlledShutdown = false, enableFetchFromFollower = true)
+  .map(KafkaConfig.fromProps(_, overridingProps))
+
+  @BeforeEach
+  def initializeFetchFromFollowerCluster(): Unit = {
+// Create a 2 broker cluster where broker 0 is the leader and 1 is the 
follower.
+
+admin = TestUtils.createAdminClient(brokers, listenerName)
+TestUtils.createTopicWithAdminRaw(
+  admin,
+  topic,
+  replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+)
+TestUtils.generateAndProduceMessages(brokers, topic, initialMessages)
+  }
+
+  @AfterEach
+  def close(): Unit = {
+if (admin != null) {
+  admin.close()
+}
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  @Timeout(30)
+  def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit 
= {
+TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
+// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout 
(30 seconds) to ensure that the
+// test only passes when the delayed fetch purgatory is completed after 
successfully replicating from the leader.
+
+val totalMessages = initialMessages + nMessages
+val topicPartition = new TopicPartition(topic, 0)
+val offsetMap = Map[TopicPartition, Long](
+  topicPartition -> (totalMessages - 1)

Review Comment:
   ok, looking deeper into this, here's my understanding as to why fetching 
offset 200 did not work:
   
   The log end offset points to the next offset the log will append a record, 
so technically it doesn't exist yet. furthermore, the high watermark is updated 
to the log end offset. in `Partition.maybeIncrementLeaderHW()`:
   ```
   var newHighWatermark = leaderLogEndOffset
   ...
   leaderLog.maybeIncrementHighWatermark(newHighWatermark)
   ```
   even if the fetch offset is equal to the high watermark, we cannot fetch it 
if the high watermark is pointing to the log end offset. 
   
   this is done in DelayedFetch where we skip accumulating bytes `if 
(endOffset.messageOffset (LOE) == fetchOffset.messageOffset (HWM))`
   
   @dajac does that align with your understanding?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


jeffkbkim commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007544878


##
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##
@@ -132,9 +141,17 @@ class ReplicaFetcherThread(name: String,
 
 brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
 
+logAppendInfo.foreach { _ => partitionsWithNewRecords += topicPartition }

Review Comment:
   i believe that is the case. from 
[UnifiedLog.updatedHighWatermarkWithLogEndOffset()](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/UnifiedLog.scala#L597-L603)
   ```
 private def updateHighWatermarkWithLogEndOffset(): Unit = {
   // Update the high watermark in case it has gotten ahead of the log end 
offset following a truncation
   // or if a new segment has been rolled and the offset metadata needs to 
be updated.
   if (highWatermark >= localLog.logEndOffset) {
 updateHighWatermarkMetadata(localLog.logEndOffsetMetadata)
   }
 }
   ```
   this is called on every `UnifiedLog.append()` and `UnifiedLog.roll()`
   
   however, i did notice that `LeaderHWChange` is not set anywhere else except 
in the produce code path. from `Partition.appendRecordsToLeader()`:
   ```
   info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased 
else LeaderHwChange.Same)
   ```
   no other `LogAppendInfo` initialization passes in LeaderHWChange.
   
   so, the follower would need to keep track of whether the high watermark 
changed for each partition which i don't think is something we want to do. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


jeffkbkim commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007253784


##
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.server
+
+import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.Admin
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.Properties
+
+class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
+  val numNodes = 2
+  val numParts = 1
+  val initialMessages = 100
+  val nMessages = 100
+
+  val topic = "test-fetch-from-follower"
+  val leaderBrokerId = 0
+  val followerBrokerId = 1
+  var admin: Admin = null
+
+  def overridingProps: Properties = {
+val props = new Properties
+props.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+
+props
+  }
+
+  override def generateConfigs: collection.Seq[KafkaConfig] =
+TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, 
enableControlledShutdown = false, enableFetchFromFollower = true)
+  .map(KafkaConfig.fromProps(_, overridingProps))
+
+  @BeforeEach
+  def initializeFetchFromFollowerCluster(): Unit = {
+// Create a 2 broker cluster where broker 0 is the leader and 1 is the 
follower.
+
+admin = TestUtils.createAdminClient(brokers, listenerName)
+TestUtils.createTopicWithAdminRaw(
+  admin,
+  topic,
+  replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+)
+TestUtils.generateAndProduceMessages(brokers, topic, initialMessages)
+  }
+
+  @AfterEach
+  def close(): Unit = {
+if (admin != null) {
+  admin.close()
+}
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  @Timeout(30)
+  def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit 
= {
+TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
+// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout 
(30 seconds) to ensure that the
+// test only passes when the delayed fetch purgatory is completed after 
successfully replicating from the leader.
+
+val totalMessages = initialMessages + nMessages
+val topicPartition = new TopicPartition(topic, 0)
+val offsetMap = Map[TopicPartition, Long](
+  topicPartition -> (totalMessages - 1)

Review Comment:
   the suggestion makes sense. i think the point you're getting at is we need 
to have the fetch sit in purgatory before replication. and i confirmed your 
suggestion works locally.
   
   however, i'm still unable to grasp why fetching offset 200 fails. let's say 
"there is no record at offset 200 yet" which means replication is not complete. 
the fetch request will sit in the purgatory and wake up once the record is 
replicated.
   
   could it be the case where record at offset 200 is already replicated but 
the follower's HWM is not updated yet to 200 so the fetch request sits in 
purgatory and never wakes up because there are no new records?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-10-27 Thread GitBox


jeffkbkim commented on code in PR #12783:
URL: https://github.com/apache/kafka/pull/12783#discussion_r1007549176


##
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.server
+
+import kafka.server.{BaseFetchRequestTest, BrokerTopicStats, KafkaConfig}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.Admin
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.Properties
+
+class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
+  val numNodes = 2
+  val numParts = 1
+  val initialMessages = 100
+  val nMessages = 100
+
+  val topic = "test-fetch-from-follower"
+  val leaderBrokerId = 0
+  val followerBrokerId = 1
+  var admin: Admin = null
+
+  def overridingProps: Properties = {
+val props = new Properties
+props.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+
+props
+  }
+
+  override def generateConfigs: collection.Seq[KafkaConfig] =
+TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, 
enableControlledShutdown = false, enableFetchFromFollower = true)
+  .map(KafkaConfig.fromProps(_, overridingProps))
+
+  @BeforeEach
+  def initializeFetchFromFollowerCluster(): Unit = {
+// Create a 2 broker cluster where broker 0 is the leader and 1 is the 
follower.
+
+admin = TestUtils.createAdminClient(brokers, listenerName)
+TestUtils.createTopicWithAdminRaw(
+  admin,
+  topic,
+  replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+)
+TestUtils.generateAndProduceMessages(brokers, topic, initialMessages)
+  }
+
+  @AfterEach
+  def close(): Unit = {
+if (admin != null) {
+  admin.close()
+}
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  @Timeout(30)
+  def testFollowerCompleteDelayedPurgatoryOnReplication(quorum: String): Unit 
= {
+TestUtils.generateAndProduceMessages(brokers, topic, nMessages)
+// set fetch.max.wait.ms to a value (45 seconds) greater than the timeout 
(30 seconds) to ensure that the
+// test only passes when the delayed fetch purgatory is completed after 
successfully replicating from the leader.
+
+val totalMessages = initialMessages + nMessages
+val topicPartition = new TopicPartition(topic, 0)
+val offsetMap = Map[TopicPartition, Long](
+  topicPartition -> (totalMessages - 1)

Review Comment:
   also, i confirmed that the existing test fails when the follower is already 
up-to-date. the test above should work generally because the fetch request 
reaches the follower before the follower is fully replicated. will update the 
test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] opencmit2 commented on pull request #12788: [Test]Testing Gradle OOM

2022-10-27 Thread GitBox


opencmit2 commented on PR #12788:
URL: https://github.com/apache/kafka/pull/12788#issuecomment-1294428194

   Note: DefaultCachedClasspathTransformer,
   
   The latest code has been modified to limit the number of threads. It is 
changed to be linked to the number of CPU cores.
   
   Limit the number of threads of gradle jar transfer thread
   
   We can think from these two aspects


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >