[jira] [Commented] (KAFKA-17395) Flaky test testMissingOffsetNoResetPolicy for new consumer

2024-08-30 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877989#comment-17877989
 ] 

PoAn Yang commented on KAFKA-17395:
---

Hi [~taijuwu], yeah, like you said, if {{CoordinatorRequestManager}} sends 
{{FindCoordinatorRequest}} before {{MockClient#prepareResponseFrom}}, the 
response has no chance to match the request. However, refactor {{MockClient}} 
is a big effort. I think we can just move {{MockClient#prepareResponseFrom}} 
before {{newConsumer}}, so there will be response in list before the request. I 
will file a PR later. Thanks.

> Flaky test testMissingOffsetNoResetPolicy for new consumer
> --
>
> Key: KAFKA-17395
> URL: https://issues.apache.org/jira/browse/KAFKA-17395
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test
>
> KafkaConsumerTest.testMissingOffsetNoResetPolicy is flaky when running for 
> the new consumer (passing consistently for the classic consumer). 
> Fails with : 
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Consumer was not able to update fetch positions on continuous calls with 0 
> timeout ==> expected:  but was: 
> It's been flaky since it was enabled for the new consumer with 
> [https://github.com/apache/kafka/pull/16587] 
> See last couple of month runs on trunk showing the flakiness: 
> [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172429919&search.startTimeMin=171721440&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testMissingOffsetNoResetPolicy(GroupProtocol)%5B2%5D]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17395) Flaky test testMissingOffsetNoResetPolicy for new consumer

2024-08-30 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang reassigned KAFKA-17395:
-

Assignee: PoAn Yang

> Flaky test testMissingOffsetNoResetPolicy for new consumer
> --
>
> Key: KAFKA-17395
> URL: https://issues.apache.org/jira/browse/KAFKA-17395
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test
>
> KafkaConsumerTest.testMissingOffsetNoResetPolicy is flaky when running for 
> the new consumer (passing consistently for the classic consumer). 
> Fails with : 
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Consumer was not able to update fetch positions on continuous calls with 0 
> timeout ==> expected:  but was: 
> It's been flaky since it was enabled for the new consumer with 
> [https://github.com/apache/kafka/pull/16587] 
> See last couple of month runs on trunk showing the flakiness: 
> [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172429919&search.startTimeMin=171721440&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testMissingOffsetNoResetPolicy(GroupProtocol)%5B2%5D]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka 8850: Updated documentation to clarify fetch.min.bytes behaviour. [kafka]

2024-08-30 Thread via GitHub


abhi-ksolves commented on PR #16749:
URL: https://github.com/apache/kafka/pull/16749#issuecomment-2320311037

   Hi @mjsax,
   Can you review this PR? 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17041) Add pagination when describe large set of metadata via Admin API

2024-08-30 Thread Lin Siyuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877992#comment-17877992
 ] 

Lin Siyuan commented on KAFKA-17041:


hi [~omnia_h_ibrahim] , I have been following this KIP for a long time, if it 
passes the vote, I hope to be able to participate in the transformation of one 
of the interfaces, thank you.

> Add pagination when describe large set of metadata via Admin API 
> -
>
> Key: KAFKA-17041
> URL: https://issues.apache.org/jira/browse/KAFKA-17041
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Omnia Ibrahim
>Assignee: Omnia Ibrahim
>Priority: Major
> Attachments: image-2024-08-01-14-08-00-999.png, 
> image-2024-08-19-17-32-08-015.png
>
>
> Some of the request via Admin API timeout on large cluster or cluster with 
> large set of specific metadata. For example OffsetFetchRequest and 
> DescribeLogDirsRequest timeout due to large number of partition on cluster. 
> Also DescribeProducersRequest and ListTransactionsRequest time out due to too 
> many short lived PID or too many hanging transactions
> [KIP-1062: Introduce Pagination for some requests used by Admin 
> API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1062%3A+Introduce+Pagination+for+some+requests+used+by+Admin+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17445) Kafka streams keeps rebalancing with the following reasons

2024-08-30 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878006#comment-17878006
 ] 

Bruno Cadonna commented on KAFKA-17445:
---

[~rohitbobade] It is not clear to me what you tried to achieve by setting 
{{group.instance.id}}. Could yo please elaborate?
Did you increase {{session.timeout.ms}} as described in the config definition 
(https://kafka.apache.org/documentation/#consumerconfigs_group.instance.id)
Could you describe the exact steps?
Did you delete the consumer group on the broker between the attempts?
Was this a new Streams app or an existing one? 

> Kafka streams keeps rebalancing with the following reasons
> --
>
> Key: KAFKA-17445
> URL: https://issues.apache.org/jira/browse/KAFKA-17445
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Rohit Bobade
>Priority: Major
>
> We recently upgraded Kafka streams version to 3.8.0 and are seeing that the 
> streams app keeps rebalancing and does not process any events
> We have explicitly set the config 
> GROUP_INSTANCE_ID_CONFIG
> This is what we see on the broker logs:
> [GroupCoordinator 2]: Preparing to rebalance group \{consumer-group-name} in 
> state PreparingRebalance with old generation 24781 (__consumer_offsets-29) 
> (reason: Updating metadata for static member {} with instance id {}; client 
> reason: rebalance failed due to UnjoinedGroupException)
> We also tried to remove the GROUP_INSTANCE_ID_CONFIG but then see these logs 
> and rebalancing and no processing still
> sessionTimeoutMs=45000, rebalanceTimeoutMs=180, 
> supportedProtocols=List(stream)) has left group \{groupId} through explicit 
> `LeaveGroup`; client reason: the consumer unsubscribed from all topics 
> (kafka.coordinator.group.GroupCoordinator)
> other logs show:
> during Stable; client reason: need to revoke partitions and re-join)
> client reason: triggered followup rebalance scheduled for 0
> On the application logs we see:
> 1. state being restored from changelog topic
> 2. INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread  at state RUNNING: partitions  lost due to missed rebalance.
> Detected that the thread is being fenced. This implies that this thread 
> missed a rebalance and dropped out of the consumer group. Will close out all 
> assigned tasks and rejoin the consumer group.
>  
> 3. Task Migrated exceptions
> org.apache.kafka.streams.errors.TaskMigratedException: Error encountered 
> sending record to topic
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer with 
> transactionalId
> attempted to produce with an old epoch
> Written offsets would not be recorded and no more records would be sent since 
> the producer is fenced, indicating the task may be migrated out; it means all 
> tasks belonging to this thread should be migrated.
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:306)
>  ~[kafka-streams-3.8.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:286)
>  ~[kafka-streams-3.8.0.jar:?]
> at 
> datadog.trace.instrumentation.kafka_clients.KafkaProducerCallback.onCompletion(KafkaProducerCallback.java:44)
>  ~[?:?]
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1106)
>  ~[kafka-clients-3.8.0.jar:?]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17395) Flaky test testMissingOffsetNoResetPolicy for new consumer

2024-08-30 Thread TaiJuWu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878014#comment-17878014
 ] 

TaiJuWu commented on KAFKA-17395:
-

Hi [~yangpoan] ,
You always have better solution:)

> Flaky test testMissingOffsetNoResetPolicy for new consumer
> --
>
> Key: KAFKA-17395
> URL: https://issues.apache.org/jira/browse/KAFKA-17395
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test
>
> KafkaConsumerTest.testMissingOffsetNoResetPolicy is flaky when running for 
> the new consumer (passing consistently for the classic consumer). 
> Fails with : 
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Consumer was not able to update fetch positions on continuous calls with 0 
> timeout ==> expected:  but was: 
> It's been flaky since it was enabled for the new consumer with 
> [https://github.com/apache/kafka/pull/16587] 
> See last couple of month runs on trunk showing the flakiness: 
> [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172429919&search.startTimeMin=171721440&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testMissingOffsetNoResetPolicy(GroupProtocol)%5B2%5D]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16448 Add timestamp to error handler context [kafka]

2024-08-30 Thread via GitHub


sebastienviale opened a new pull request, #17054:
URL: https://github.com/apache/kafka/pull/17054

   This PR is part of 
[KAFKA-16448](https://issues.apache.org/jira/browse/KAFKA-16448) which aims to 
bring a ProcessingExceptionHandler to Kafka Streams in order to deal with 
exceptions that occur during processing.
   
   This PR add the timestamp to the ErrorHandlerContext interface
   
   Jira: https://issues.apache.org/jira/browse/KAFKA-16448.
   
   Contributors
   @Dabz
   @sebastienviale
   @loicgreffier


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Changed fetch queue processing to reduce the no. of locking and unlocking activity [kafka]

2024-08-30 Thread via GitHub


adixitconfluent opened a new pull request, #17055:
URL: https://github.com/apache/kafka/pull/17055

   ### About
   For the share groups fetch request processing, we have an recursive approach 
of dealing with individual fetch requests. While  it works fine with less no. 
of records (< 1,000,000) and lesser sharing (< 5 share consumers), it seems 
that some requests are getting stuck when we increase the load and try to 
increase the throughput. I've replaced this approach by removing the unlocking 
and locking of fetch queue in between entries. This had reduced the complexity 
and also removes the reliability issue on increasing the load.
   
   ### Testing
   The code has been tested with the help of unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-08-30 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878022#comment-17878022
 ] 

Chia-Ping Tsai commented on KAFKA-16792:


AsyncConsumer close(0) has similar issue that it needs to wait network thread 
to trigger something. Hence, the “small” timeout will make the call 
(poll/close) leave before network thread complete the event.

In summary, poll(0) could not trigger FC request when return and close(0) could 
not trigger rebalance listener. Both are inconsistent behavior to classic 
consumer.

We should document the new behavior if AsyncConsumer does want to honor the 
timeout.

 

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testCurrentLag
>  - testFetchStableOffsetThrowInPoll
>  - testListOffsetShouldUpdateSubscriptions
>  - testPollReturnsRecords
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17424: Memory optimisation for Kafka-connect [kafka]

2024-08-30 Thread via GitHub


ajit97singh closed pull request #17002: KAFKA-17424: Memory optimisation for 
Kafka-connect
URL: https://github.com/apache/kafka/pull/17002


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14572: Migrate EmbeddedKafkaCluster used by Streams integration tests from EmbeddedZookeeper to KRaft [kafka]

2024-08-30 Thread via GitHub


lucasbru commented on PR #17016:
URL: https://github.com/apache/kafka/pull/17016#issuecomment-2320694184

   @OmniaGM Looks like something in kraft is just taking longer? Have we not 
observed this problem with the connect unit tests? I wonder if there are some 
settings that we could use to speed things up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17447) Changed fetch queue processing to reduce the no. of locking and unlocking activity

2024-08-30 Thread Abhinav Dixit (Jira)
Abhinav Dixit created KAFKA-17447:
-

 Summary: Changed fetch queue processing to reduce the no. of 
locking and unlocking activity
 Key: KAFKA-17447
 URL: https://issues.apache.org/jira/browse/KAFKA-17447
 Project: Kafka
  Issue Type: Sub-task
Reporter: Abhinav Dixit
Assignee: Abhinav Dixit


For the share groups fetch request processing, we have an recursive approach of 
dealing with individual fetch requests. While it works fine with less no. of 
records (< 1,000,000) and lesser sharing (< 5 share consumers), it seems that 
some requests are getting stuck when we increase the load and try to increase 
the throughput. I've replaced this approach by removing the unlocking and 
locking of fetch queue in between entries. This had reduced the complexity and 
also removes the reliability issue on increasing the load.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16968) Make 3.8-IV0 a stable MetadataVersion and create 3.9-IV0

2024-08-30 Thread TaiJuWu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TaiJuWu resolved KAFKA-16968.
-
Resolution: Fixed

> Make 3.8-IV0 a stable MetadataVersion and create 3.9-IV0
> 
>
> Key: KAFKA-16968
> URL: https://issues.apache.org/jira/browse/KAFKA-16968
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
>
> Before we can release 3.8, we must make 3.8-IV0 a stable MetadataVersion and 
> create 3.9-IV0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17446) Kafka streams stuck in rebalancing

2024-08-30 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878040#comment-17878040
 ] 

Bruno Cadonna commented on KAFKA-17446:
---

[~rohitbobade] Could you please share some more details? Logs preferably on 
{{DEBUG}} level would be great!

> Kafka streams stuck in rebalancing
> --
>
> Key: KAFKA-17446
> URL: https://issues.apache.org/jira/browse/KAFKA-17446
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: Rohit Bobade
>Priority: Major
>
> Kafka streams stuck in endless rebalancing with the following error:
> org.apache.kafka.streams.errors.LockException: stream-thread task [0_1] 
> Failed to lock the state directory for task 0_1
> org.apache.kafka.streams.processor.internals.TaskManager - stream-thread 
> Encountered lock exception. Reattempting locking the state in the next 
> iteration.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17293: New consumer HeartbeatRequestManager should rediscover disconnected coordinator [kafka]

2024-08-30 Thread via GitHub


lianetm commented on code in PR #16844:
URL: https://github.com/apache/kafka/pull/16844#discussion_r1738389019


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -122,6 +123,23 @@ NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long curren
 });
 }
 
+/**
+ * Handles the disconnection of the current coordinator.
+ * This method checks if the given exception is an instance of {@link 
DisconnectException}.
+ * If so, it marks the coordinator as unknown, indicating that the client 
should
+ * attempt to discover a new coordinator. For any other exception type, no 
action is performed.
+ *
+ * @param exception The exception to handle, which was received as 
part of a request response.
+ *  If this is an instance of {@link 
DisconnectException}, the coordinator is marked as unknown.
+ *  For other types of exceptions, no action is 
performed.

Review Comment:
   nit: this is already said right above, so let's just remove it from here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17293: New consumer HeartbeatRequestManager should rediscover disconnected coordinator [kafka]

2024-08-30 Thread via GitHub


frankvicky commented on code in PR #16844:
URL: https://github.com/apache/kafka/pull/16844#discussion_r1738393452


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -122,6 +123,23 @@ NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long curren
 });
 }
 
+/**
+ * Handles the disconnection of the current coordinator.
+ * This method checks if the given exception is an instance of {@link 
DisconnectException}.
+ * If so, it marks the coordinator as unknown, indicating that the client 
should
+ * attempt to discover a new coordinator. For any other exception type, no 
action is performed.
+ *
+ * @param exception The exception to handle, which was received as 
part of a request response.
+ *  If this is an instance of {@link 
DisconnectException}, the coordinator is marked as unknown.
+ *  For other types of exceptions, no action is 
performed.

Review Comment:
   Yes, it's a little verbose. I will remove it 👍🏼 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-08-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878054#comment-17878054
 ] 

Lianet Magrans commented on KAFKA-16792:


Agree with [~chia7712] 's point, we've had this same situation and is expected 
in all these tests that basically make an api call (app thread), and check 
right after that there is a request generated. For the new consumer that does 
not happen right away: all request are generated in the background thread, once 
it's has had the time to runOnce and process the event triggered by the api 
call. So the waitForCondition seems sensible at this test level (they are 
really checking the internals, so need to deal with it). On the api 
implementations, we cover this delay and try to hide it as much as possible by 
blocking with addAndGet(event) to wait for the background to process the 
event/requests.  

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testCurrentLag
>  - testFetchStableOffsetThrowInPoll
>  - testListOffsetShouldUpdateSubscriptions
>  - testPollReturnsRecords
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17442: Handled persister errors with write async calls (KIP-932) [kafka]

2024-08-30 Thread via GitHub


apoorvmittal10 commented on PR #16956:
URL: https://github.com/apache/kafka/pull/16956#issuecomment-2320893368

   @junrao Can I please get review on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17442: Handled persister errors with write async calls (KIP-932) [kafka]

2024-08-30 Thread via GitHub


apoorvmittal10 commented on PR #16956:
URL: https://github.com/apache/kafka/pull/16956#issuecomment-2320892038

   > Thanks for the PR. I had a query, what will happen to the ongoing write 
state requests to the persister when you close a share partition. I understand 
that it is fine if some write state calls are not completed, but will cause any 
problems while close of `SharePartitionManager`?
   
   The broker must wait for the pending requests to be completed, so I think it 
should not be problem unless persister never replies back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17395: Flaky test testMissingOffsetNoResetPolicy for new consumer [kafka]

2024-08-30 Thread via GitHub


FrankYang0529 opened a new pull request, #17056:
URL: https://github.com/apache/kafka/pull/17056

   In `AsyncKafkaConsumer`, `FindCoordinatorRequest` is sent by background 
thread. In `MockClient#prepareResponseFrom`, it only matches the response to a 
future request. If there is some race condition, `FindCoordinatorResponse` may 
not match to a `FindCoordinatorRequest`. It's better to put 
`MockClient#prepareResponseFrom` before the request to avoid flaky test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17448) New consumer seek should update positions in background thread

2024-08-30 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-17448:
--

 Summary: New consumer seek should update positions in background 
thread
 Key: KAFKA-17448
 URL: https://issues.apache.org/jira/browse/KAFKA-17448
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.7.1, 3.8.0, 3.7.0
Reporter: Lianet Magrans


In the new AsyncKafkaConsumer, a call to seek will update the positions in 
subscription state for the assigned partitions in the app thread 
([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796])

This could lead to race conditions like we've seen when subscription state 
changes in the app thread (over a set of assigned partitions), that could have 
been modified in the background thread, leading to errors on "No current 
assignment for partition " 
[https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378]
 

Also, positions update is moved the background with KAFKA-17066 for the same 
reason, so even if the assignment does not change, we could have a race between 
the background setting positions to the committed offsets for instance, and the 
app thread setting them manually via seek. 

To avoid all of the above, we should have seek generate an event, send it to 
the background, and then update the subscription state when processing that 
event (similar to other api calls, ex, assign with KAFKA-17064)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17448) New consumer seek should update positions in background thread

2024-08-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878065#comment-17878065
 ] 

Lianet Magrans commented on KAFKA-17448:


Hey [~payang] , this issue is very similar to KAFKA-17064 you just fixed. If 
you have bandwidth this would probably be very familiar to you already :).

> New consumer seek should update positions in background thread
> --
>
> Key: KAFKA-17448
> URL: https://issues.apache.org/jira/browse/KAFKA-17448
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0, 3.8.0, 3.7.1
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> In the new AsyncKafkaConsumer, a call to seek will update the positions in 
> subscription state for the assigned partitions in the app thread 
> ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796])
> This could lead to race conditions like we've seen when subscription state 
> changes in the app thread (over a set of assigned partitions), that could 
> have been modified in the background thread, leading to errors on "No current 
> assignment for partition " 
> [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378]
>  
> Also, positions update is moved the background with KAFKA-17066 for the same 
> reason, so even if the assignment does not change, we could have a race 
> between the background setting positions to the committed offsets for 
> instance, and the app thread setting them manually via seek. 
> To avoid all of the above, we should have seek generate an event, send it to 
> the background, and then update the subscription state when processing that 
> event (similar to other api calls, ex, assign with KAFKA-17064)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17448) New consumer seek should update positions in background thread

2024-08-30 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-17448:
---
Fix Version/s: 4.0.0

> New consumer seek should update positions in background thread
> --
>
> Key: KAFKA-17448
> URL: https://issues.apache.org/jira/browse/KAFKA-17448
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0, 3.8.0, 3.7.1
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> In the new AsyncKafkaConsumer, a call to seek will update the positions in 
> subscription state for the assigned partitions in the app thread 
> ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796])
> This could lead to race conditions like we've seen when subscription state 
> changes in the app thread (over a set of assigned partitions), that could 
> have been modified in the background thread, leading to errors on "No current 
> assignment for partition " 
> [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378]
>  
> Also, positions update is moved the background with KAFKA-17066 for the same 
> reason, so even if the assignment does not change, we could have a race 
> between the background setting positions to the committed offsets for 
> instance, and the app thread setting them manually via seek. 
> To avoid all of the above, we should have seek generate an event, send it to 
> the background, and then update the subscription state when processing that 
> event (similar to other api calls, ex, assign with KAFKA-17064)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17448) New consumer seek should update positions in background thread

2024-08-30 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-17448:
---
Component/s: clients

> New consumer seek should update positions in background thread
> --
>
> Key: KAFKA-17448
> URL: https://issues.apache.org/jira/browse/KAFKA-17448
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0, 3.8.0, 3.7.1
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> In the new AsyncKafkaConsumer, a call to seek will update the positions in 
> subscription state for the assigned partitions in the app thread 
> ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796])
> This could lead to race conditions like we've seen when subscription state 
> changes in the app thread (over a set of assigned partitions), that could 
> have been modified in the background thread, leading to errors on "No current 
> assignment for partition " 
> [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378]
>  
> Also, positions update is moved the background with KAFKA-17066 for the same 
> reason, so even if the assignment does not change, we could have a race 
> between the background setting positions to the committed offsets for 
> instance, and the app thread setting them manually via seek. 
> To avoid all of the above, we should have seek generate an event, send it to 
> the background, and then update the subscription state when processing that 
> event (similar to other api calls, ex, assign with KAFKA-17064)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR Handle test re-runs in junit.py [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on code in PR #17034:
URL: https://github.com/apache/kafka/pull/17034#discussion_r1738517170


##
.github/scripts/junit.py:
##
@@ -148,29 +168,73 @@ def pretty_time_duration(seconds: float) -> str:
 total_failures += suite.failures
 total_errors += suite.errors
 total_time += suite.time
-for test_failure in suite.test_failures:
+
+# Due to how the Develocity Test Retry plugin interacts with 
our geneated ClusterTests, we can see
+# tests pass and then fail in the same run. Because of this, 
we need to capture all passed and all
+# failed for each suite. Then we can find flakes by taking the 
intersection of those two.
+all_suite_passed = {test.key() for test in suite.passed_tests}
+all_suite_failed = {test.key() for test in suite.failed_tests}
+flaky = all_suite_passed & all_suite_failed
+total_flaky += len(flaky)
+
+# Display failures first
+for test_failure in suite.failed_tests:
+if test_failure.key() in flaky:
+continue
 logger.debug(f"Found test failure: {test_failure}")
 simple_class_name = test_failure.class_name.split(".")[-1]
-table.append(("❌", simple_class_name, 
test_failure.test_name, test_failure.failure_message, 
f"{test_failure.time:0.2f}s"))
+failed_table.append((simple_class_name, 
test_failure.test_name, test_failure.failure_message, 
f"{test_failure.time:0.2f}s"))
+for test_failure in suite.failed_tests:
+if test_failure.key() not in flaky:
+continue
+logger.debug(f"Found flaky test: {test_failure}")
+simple_class_name = test_failure.class_name.split(".")[-1]
+flaky_table.append((simple_class_name, 
test_failure.test_name, test_failure.failure_message, 
f"{test_failure.time:0.2f}s"))
 for skipped_test in suite.skipped_tests:
 simple_class_name = skipped_test.class_name.split(".")[-1]
 logger.debug(f"Found skipped test: {skipped_test}")
-table.append(("⚠️", simple_class_name, 
skipped_test.test_name, "Skipped", ""))
+skipped_table.append((simple_class_name, 
skipped_test.test_name))
 duration = pretty_time_duration(total_time)
+logger.info(f"Finished processing {len(reports)} reports")
 
 # Print summary
 report_url = get_env("REPORT_URL")
 report_md = f"Download [HTML report]({report_url})."
-summary = f"{total_tests} tests run in {duration}, {total_failures} failed 
❌, {total_skipped} skipped ⚠️, {total_errors} errors."
-logger.debug(summary)
+summary = f"{total_tests} tests run in {duration}, {total_failures} 
{FAILED}, {total_flaky} {FLAKY}, {total_skipped} {SKIPPED}, and {total_errors} 
errors."

Review Comment:
   The tests shown by "Failed Tests" table exclude the "flaky". Does it confuse 
readers that the "number of failed" is NOT equal to the number of tests shown 
by "Failed Tests" table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]

2024-08-30 Thread via GitHub


dajac opened a new pull request, #17057:
URL: https://github.com/apache/kafka/pull/17057

   This patch changes the default configuration of 
`group.coordinator.rebalance.protocols` to `classic,consumer`. It also updates 
various tests that were specifying the new default value.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]

2024-08-30 Thread via GitHub


dajac commented on code in PR #17057:
URL: https://github.com/apache/kafka/pull/17057#discussion_r1738523290


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -576,19 +576,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' 
protocol is not supported.")
 }
 if (protocols.contains(GroupType.CONSUMER)) {
-  if (processRoles.isEmpty) {

Review Comment:
   I had to soften the validation here because ZK also has the new default 
value now. Keep in mind that ZK will be remove entirely in 4.0 so it won't be 
an issue for the next release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix broken output layout of kafka-consumer-groups.sh [kafka]

2024-08-30 Thread via GitHub


sasakitoa opened a new pull request, #17058:
URL: https://github.com/apache/kafka/pull/17058

   Output format of `kafka-consumer-groups.sh` with `--delete-offsets` or 
`--reset-offsets` has been broken since line separators are missing.
   This PR will fix the problem.
   
   --
   Current: 
   ```
   $ kafka-consumer-groups.sh --bootstrap-server localhost:9092 
--delete-offsets --topic topic1 --group TestGroup1
   Request succeed for deleting offsets with topic topic1 group TestGroup1
   
   TOPIC  PARTITION   STATUS topic1 
0   Successful topic1 1 
  Successful topic1 2   
Successful topic1 3   Successful 
topic1 4   Successful topic1
 5   Successful topic1 6
   Successful topic1 7   
Successful topic1 8   Successful 
topic1 9   Successful 
   ```
   
   Expect:
   ```
   $ kafka-consumer-groups.sh --bootstrap-server localhost:9092 
--delete-offsets --topic topic1 --group TestGroup1 
   Request succeed for deleting offsets with topic topic1 group TestGroup1
   
   TOPIC  PARTITION   STATUS 
   topic1 0   Successful 
   topic1 1   Successful 
   topic1 2   Successful 
   topic1 3   Successful 
   topic1 4   Successful 
   topic1 5   Successful 
   topic1 6   Successful 
   topic1 7   Successful 
   topic1 8   Successful 
   topic1 9   Successful 
   ```
   
   --
   Current: 
   ```
   $ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets 
--from-file /tmp/offsets.csv --group TestGroup1 --execute
   
   GROUP  TOPIC  PARTITION  
NEW-OFFSET TestGroup1 topic1 1  
20 TestGroup1 topic1
 0  10 TestGroup1 topic1
 3  40 TestGroup1 topic1
 2  30 TestGroup1 
topic1 5  60 TestGroup1 
topic1 4  50 
   ```
   
   Expect:
   ```
   $ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets 
--from-file /tmp/offsets.csv --group TestGroup1 --execute
   
   GROUP  TOPIC  PARTITION  
NEW-OFFSET 
   TestGroup1 topic1 1  20  
   
   TestGroup1 topic1 0  10  
   
   TestGroup1 topic1 3  40  
   
   TestGroup1 topic1 2  30  
   
   TestGroup1 topic1 5  60  
   
   TestGroup1 topic1 4  50  
   
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17433 Add a deflake Github action [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on code in PR #17019:
URL: https://github.com/apache/kafka/pull/17019#discussion_r1738533619


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -199,46 +239,53 @@ private List 
processClusterTests(ExtensionContext
 return ret;
 }
 
-private List 
processClusterTest(ExtensionContext context, ClusterTest annot, 
ClusterTestDefaults defaults) {
-List ret = 
processClusterTestInternal(context, annot, defaults);
+private List processClusterTestInternal(
+ExtensionContext context,
+ClusterTest clusterTest,
+ClusterTestDefaults defaults
+) {
+Type[] types = clusterTest.types().length == 0 ? defaults.types() : 
clusterTest.types();
+Map serverProperties = 
Stream.concat(Arrays.stream(defaults.serverProperties()), 
Arrays.stream(clusterTest.serverProperties()))
+.filter(e -> e.id() == -1)
+.collect(Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b));
 
-if (ret.isEmpty()) {
-throw new IllegalStateException("processClusterTest method should 
provide at least one config");
-}
+Map> perServerProperties = 
Stream.concat(Arrays.stream(defaults.serverProperties()), 
Arrays.stream(clusterTest.serverProperties()))
+.filter(e -> e.id() != -1)
+.collect(Collectors.groupingBy(ClusterConfigProperty::id, 
Collectors.mapping(Function.identity(),
+Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b;
 
-return ret;
-}
-private List 
processClusterTestInternal(ExtensionContext context, ClusterTest annot, 
ClusterTestDefaults defaults) {
-Type[] types = annot.types().length == 0 ? defaults.types() : 
annot.types();
-Map serverProperties = 
Stream.concat(Arrays.stream(defaults.serverProperties()), 
Arrays.stream(annot.serverProperties()))
-.filter(e -> e.id() == -1)
-.collect(Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b));
+Map features = Arrays.stream(clusterTest.features())
+.collect(Collectors.toMap(ClusterFeature::feature, 
ClusterFeature::version));
 
-Map> perServerProperties = 
Stream.concat(Arrays.stream(defaults.serverProperties()), 
Arrays.stream(annot.serverProperties()))
-.filter(e -> e.id() != -1)
-.collect(Collectors.groupingBy(ClusterConfigProperty::id, 
Collectors.mapping(Function.identity(),
-Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b;
+ClusterConfig config = ClusterConfig.builder()
+.setTypes(new HashSet<>(Arrays.asList(types)))
+.setBrokers(clusterTest.brokers() == 0 ? defaults.brokers() : 
clusterTest.brokers())
+.setControllers(clusterTest.controllers() == 0 ? 
defaults.controllers() : clusterTest.controllers())
+.setDisksPerBroker(clusterTest.disksPerBroker() == 0 ? 
defaults.disksPerBroker() : clusterTest.disksPerBroker())
+.setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ? 
defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES)
+.setListenerName(clusterTest.listener().trim().isEmpty() ? null : 
clusterTest.listener())
+.setServerProperties(serverProperties)
+.setPerServerProperties(perServerProperties)
+.setSecurityProtocol(clusterTest.securityProtocol())
+.setMetadataVersion(clusterTest.metadataVersion())
+.setTags(Arrays.asList(clusterTest.tags()))
+.setFeatures(features)
+.build();
+
+return Arrays.stream(types)
+.map(type -> 
type.invocationContexts(context.getRequiredTestMethod().getName(), config))
+.collect(Collectors.toList());
+}
 
-Map features = Arrays.stream(annot.features())
-.collect(Collectors.toMap(ClusterFeature::feature, 
ClusterFeature::version));
+Stream repeatedClusterTests(int repeatCount, ClusterTest[] 
clusterTestAnnots) {

Review Comment:
   this is unused now.



##
.github/README.md:
##
@@ -0,0 +1,40 @@
+# GitHub Actions
+
+## Overview
+
+The entry point for our build is the "CI" workflow which is define in ci.yml.
+This is used for both PR and trunk builds. The jobs and steps of the workflow
+are defined in build.yml.
+
+## Opting-in to GitHub Actions
+
+To opt-in to the new GitHub actions workflows, simply name your branch with a
+prefix of "gh-". For example, `gh-KAFKA-17433-deflake`
+
+## Disabling Email Notifications
+
+By default, GitHub sends an email for each failed action run. To change this,
+visit https://github.com/settings/notifications and find System -> Actions.
+Here you can change your notification preferences.
+
+## Publishing Build Scans
+
+> This only works for com

Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on code in PR #17057:
URL: https://github.com/apache/kafka/pull/17057#discussion_r1738568876


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -160,15 +159,11 @@ default Admin createAdminClient() {
 }
 
 default Set supportedGroupProtocols() {
-Map serverProperties = config().serverProperties();
-Set supportedGroupProtocols = new HashSet<>();
-supportedGroupProtocols.add(CLASSIC);
-
-if 
(serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"").contains("consumer")) {
-supportedGroupProtocols.add(CONSUMER);
+if (isKRaftTest()) {

Review Comment:
   The `CONSUMER` protocol can be disabled even though it is kraft mode, right? 
Maybe we can check the `KafkaAPIs` directly. for example:
   ```java
   default Set supportedGroupProtocols() {
   if (brokers().values().stream().allMatch(b -> 
b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
   return mkSet(CLASSIC, CONSUMER);
   } else {
   return Collections.singleton(CLASSIC);
   }
   }
   ```



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -76,9 +76,9 @@ public class GroupCoordinatorConfig {
 public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = 
"group.coordinator.rebalance.protocols";
 public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = 
"The list of enabled rebalance protocols. Supported protocols: " +
 
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(","))
 + ". " +
-"The " + Group.GroupType.CONSUMER + " rebalance protocol is in 
preview and therefore must not be used in production. " +
 "The " + Group.GroupType.SHARE + " rebalance protocol is in early 
access and therefore must not be used in production.";
-public static final List 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = 
Collections.singletonList(Group.GroupType.CLASSIC.toString());
+public static final List 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT =
+Arrays.asList(Group.GroupType.CLASSIC.toString(), 
Group.GroupType.CONSUMER.toString());

Review Comment:
   Please make it immutable.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -576,19 +576,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' 
protocol is not supported.")
 }
 if (protocols.contains(GroupType.CONSUMER)) {
-  if (processRoles.isEmpty) {
-throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance 
protocol is only supported in KRaft cluster.")
-  }
-  if (!isNewGroupCoordinatorEnabled) {
-throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance 
protocol is only supported by the new group coordinator.")
+  if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
+warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only 
supported in KRaft cluster with the new group coordinator.")

Review Comment:
   Maybe `KafkaAPIs#isConsumerGroupProtocolEnabled` needs similar log when the 
`isConsumerRebalanceProtocolSupported` return false and `CONSUMER` protocol is 
enabled.
   
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L3816



##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -191,45 +189,17 @@ public void testDefaults(ClusterInstance clusterInstance) 
{
 Assertions.assertEquals(MetadataVersion.latestTesting(), 
clusterInstance.config().metadataVersion());
 }
 
-@ClusterTests({
-@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
-@ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
-}),
-@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
-@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true"),
-@ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
-})
-})
+@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
 public void testSupportedNewGroupProtocols(ClusterInstance 
clusterInstance) {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 supportedGroupProtocols.add(CONSUMER);
-
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols));
-Assertions.assertEquals(2, 
clusterInstance.supportedGroupProtocols().size());
+Assertions.assertEquals(supportedGroupProtocols, 
clusterInstance.supportedGroupProtocols());
 }
 
-@Cluster

Re: [PR] MINOR: add helper function for clusterInstance [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on code in PR #16852:
URL: https://github.com/apache/kafka/pull/16852#discussion_r1738595734


##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -236,6 +236,20 @@ public void 
testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
 Assertions.assertEquals(1, 
clusterInstance.supportedGroupProtocols().size());
 }
 
+
+
+@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
+public void testCreateTopic(ClusterInstance clusterInstance) throws 
Exception {
+String topicName = "test";
+int partitions = 3;
+short replicas = 3;
+clusterInstance.createTopic(topicName, partitions, replicas);
+
+try (Admin admin = clusterInstance.createAdminClient()) {
+
Assertions.assertTrue(admin.listTopics().listings().get().stream().anyMatch(s 
-> s.name().equals(topicName)));

Review Comment:
   Please check the partition and replica also



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]

2024-08-30 Thread via GitHub


dajac commented on code in PR #17057:
URL: https://github.com/apache/kafka/pull/17057#discussion_r1738609128


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -160,15 +159,11 @@ default Admin createAdminClient() {
 }
 
 default Set supportedGroupProtocols() {
-Map serverProperties = config().serverProperties();
-Set supportedGroupProtocols = new HashSet<>();
-supportedGroupProtocols.add(CLASSIC);
-
-if 
(serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"").contains("consumer")) {
-supportedGroupProtocols.add(CONSUMER);
+if (isKRaftTest()) {

Review Comment:
   Good idea. Let me try this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]

2024-08-30 Thread via GitHub


dajac commented on code in PR #17057:
URL: https://github.com/apache/kafka/pull/17057#discussion_r1738610295


##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -191,45 +189,17 @@ public void testDefaults(ClusterInstance clusterInstance) 
{
 Assertions.assertEquals(MetadataVersion.latestTesting(), 
clusterInstance.config().metadataVersion());
 }
 
-@ClusterTests({
-@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
-@ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
-}),
-@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
-@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true"),
-@ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
-})
-})
+@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
 public void testSupportedNewGroupProtocols(ClusterInstance 
clusterInstance) {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 supportedGroupProtocols.add(CONSUMER);
-
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols));
-Assertions.assertEquals(2, 
clusterInstance.supportedGroupProtocols().size());
+Assertions.assertEquals(supportedGroupProtocols, 
clusterInstance.supportedGroupProtocols());
 }
 
-@ClusterTests({
-@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
serverProperties = {
-@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true"),
-}),
-@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
serverProperties = {
-@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "false"),
-}),
-@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
serverProperties = {

Review Comment:
   It did not work with my implementation but it would work with yours. Let me 
bring it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]

2024-08-30 Thread via GitHub


dajac commented on code in PR #17057:
URL: https://github.com/apache/kafka/pull/17057#discussion_r1738610851


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -79,34 +77,34 @@ private ConsumerGroupCommandTestUtils() {
 }
 
 static List generator() {
-return Stream.concat(forConsumerGroupCoordinator().stream(), 
forClassicGroupCoordinator().stream())
-.collect(Collectors.toList());
+return Stream
+.concat(forKRaftGroupCoordinator().stream(), 
forZkGroupCoordinator().stream())
+.collect(Collectors.toList());
 }
 
-static List forConsumerGroupCoordinator() {
+static List forKRaftGroupCoordinator() {
 Map serverProperties = new HashMap<>();
 serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
 serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
 serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");

Review Comment:
   I was debating whether I should remove those in this PR. I can do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]

2024-08-30 Thread via GitHub


dajac commented on code in PR #17057:
URL: https://github.com/apache/kafka/pull/17057#discussion_r1738612665


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -576,19 +576,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' 
protocol is not supported.")
 }
 if (protocols.contains(GroupType.CONSUMER)) {
-  if (processRoles.isEmpty) {
-throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance 
protocol is only supported in KRaft cluster.")
-  }
-  if (!isNewGroupCoordinatorEnabled) {
-throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance 
protocol is only supported by the new group coordinator.")
+  if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
+warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only 
supported in KRaft cluster with the new group coordinator.")

Review Comment:
   Hum... This is too much in my opinion because it would log on every calls 
from the consumers. Having a warning at the beginning seems enough for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17428) remote segments deleted in RLMCopyTask stays `COPY_SEGMENT_START` state

2024-08-30 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-17428:

Labels: kip-required  (was: )

> remote segments deleted in RLMCopyTask stays `COPY_SEGMENT_START` state
> ---
>
> Key: KAFKA-17428
> URL: https://issues.apache.org/jira/browse/KAFKA-17428
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>  Labels: kip-required
>
> Currently, we will delete failed uploaded segment and Custom metadata size 
> exceeded segments in copyLogSegment in RLMCopyTask. But after deletion, these 
> segment states are still in COPY_SEGMENT_STARTED. That "might" cause 
> unexpected issues in the future. We'd better to move the state from 
> {{COPY_SEGMENT_STARTED}} -> {{DELETE_SEGMENT_STARTED}} -> 
> {{DELETE_SEGMENT_FINISHED}}
>  
> updated:
> I thought about this when I first had a look at it and one thing that 
> bothered me is that {{DELETE_SEGMENT_STARTED}} means to me that we're now in 
> a state where we attempt deletion. However if the remote store is down and we 
> fail to copy and delete we will leave that segment in 
> {{DELETE_SEGMENT_STARTED}} and not attempt to delete it till the segment 
> itself breaches retention.ms/bytes.
> We can probably just make it clearer but that was my thought at the time.
> So, maybe when in deletion loop, we can add {{DELETE_SEGMENT_STARTED}} 
> segments into deletion directly, but that also needs to consider the 
> retention size calculation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Kafka 12822 2 [kafka]

2024-08-30 Thread via GitHub


pegasas opened a new pull request, #17059:
URL: https://github.com/apache/kafka/pull/17059

   KAFKA-12829: Remove deprecated StreamsBuilder#addGlobalStore of old 
Processor API
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17449) Move Quota classes to server module

2024-08-30 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-17449:
--

 Summary: Move Quota classes to server module
 Key: KAFKA-17449
 URL: https://issues.apache.org/jira/browse/KAFKA-17449
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison
Assignee: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka 12822 2 [kafka]

2024-08-30 Thread via GitHub


pegasas commented on PR #17059:
URL: https://github.com/apache/kafka/pull/17059#issuecomment-2321259947

   
![image](https://github.com/user-attachments/assets/37b1c76a-11c6-4d19-bd77-3c55f00904c2)
   
   CI passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16379: Coordinator event queue, processing, flush, purgatory time histograms [kafka]

2024-08-30 Thread via GitHub


jeffkbkim commented on PR #16949:
URL: https://github.com/apache/kafka/pull/16949#issuecomment-2321260689

   The HdrHistogram wrapper implementation (HdrHistogram, KafkaMetricHistogram) 
was authored by @dimitarndimitrov  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-17450) Optimise the handler methods in ShareConsumeRequestManager.

2024-08-30 Thread Shivsundar R (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shivsundar R reassigned KAFKA-17450:


Assignee: Shivsundar R

> Optimise the handler methods in ShareConsumeRequestManager.
> ---
>
> Key: KAFKA-17450
> URL: https://issues.apache.org/jira/browse/KAFKA-17450
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Shivsundar R
>Assignee: Shivsundar R
>Priority: Major
>
> Currently there are 4 handler functions for ShareAcknowledge responses. 
> Instead using AcknowledgeRequestType, we could merge the code and have only 2 
> handler functions, one for ShareAcknowledge success and one for 
> ShareAcknowledge failure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17450) Optimise the handler methods in ShareConsumeRequestManager.

2024-08-30 Thread Shivsundar R (Jira)
Shivsundar R created KAFKA-17450:


 Summary: Optimise the handler methods in 
ShareConsumeRequestManager.
 Key: KAFKA-17450
 URL: https://issues.apache.org/jira/browse/KAFKA-17450
 Project: Kafka
  Issue Type: Sub-task
Reporter: Shivsundar R


Currently there are 4 handler functions for ShareAcknowledge responses. Instead 
using AcknowledgeRequestType, we could merge the code and have only 2 handler 
functions, one for ShareAcknowledge success and one for ShareAcknowledge 
failure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17448) New consumer seek should update positions in background thread

2024-08-30 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878122#comment-17878122
 ] 

PoAn Yang commented on KAFKA-17448:
---

Hi [~lianetm], thank you. I can handle it. 👍

> New consumer seek should update positions in background thread
> --
>
> Key: KAFKA-17448
> URL: https://issues.apache.org/jira/browse/KAFKA-17448
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0, 3.8.0, 3.7.1
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> In the new AsyncKafkaConsumer, a call to seek will update the positions in 
> subscription state for the assigned partitions in the app thread 
> ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796])
> This could lead to race conditions like we've seen when subscription state 
> changes in the app thread (over a set of assigned partitions), that could 
> have been modified in the background thread, leading to errors on "No current 
> assignment for partition " 
> [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378]
>  
> Also, positions update is moved the background with KAFKA-17066 for the same 
> reason, so even if the assignment does not change, we could have a race 
> between the background setting positions to the committed offsets for 
> instance, and the app thread setting them manually via seek. 
> To avoid all of the above, we should have seek generate an event, send it to 
> the background, and then update the subscription state when processing that 
> event (similar to other api calls, ex, assign with KAFKA-17064)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17448) New consumer seek should update positions in background thread

2024-08-30 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang reassigned KAFKA-17448:
-

Assignee: PoAn Yang

> New consumer seek should update positions in background thread
> --
>
> Key: KAFKA-17448
> URL: https://issues.apache.org/jira/browse/KAFKA-17448
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0, 3.8.0, 3.7.1
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> In the new AsyncKafkaConsumer, a call to seek will update the positions in 
> subscription state for the assigned partitions in the app thread 
> ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796])
> This could lead to race conditions like we've seen when subscription state 
> changes in the app thread (over a set of assigned partitions), that could 
> have been modified in the background thread, leading to errors on "No current 
> assignment for partition " 
> [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378]
>  
> Also, positions update is moved the background with KAFKA-17066 for the same 
> reason, so even if the assignment does not change, we could have a race 
> between the background setting positions to the committed offsets for 
> instance, and the app thread setting them manually via seek. 
> To avoid all of the above, we should have seek generate an event, send it to 
> the background, and then update the subscription state when processing that 
> event (similar to other api calls, ex, assign with KAFKA-17064)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17448) New consumer seek should update positions in background thread

2024-08-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878124#comment-17878124
 ] 

Lianet Magrans commented on KAFKA-17448:


Thanks! Let me know if you have questions or when you need help with reviews. 

> New consumer seek should update positions in background thread
> --
>
> Key: KAFKA-17448
> URL: https://issues.apache.org/jira/browse/KAFKA-17448
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0, 3.8.0, 3.7.1
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> In the new AsyncKafkaConsumer, a call to seek will update the positions in 
> subscription state for the assigned partitions in the app thread 
> ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796])
> This could lead to race conditions like we've seen when subscription state 
> changes in the app thread (over a set of assigned partitions), that could 
> have been modified in the background thread, leading to errors on "No current 
> assignment for partition " 
> [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378]
>  
> Also, positions update is moved the background with KAFKA-17066 for the same 
> reason, so even if the assignment does not change, we could have a race 
> between the background setting positions to the committed offsets for 
> instance, and the app thread setting them manually via seek. 
> To avoid all of the above, we should have seek generate an event, send it to 
> the background, and then update the subscription state when processing that 
> event (similar to other api calls, ex, assign with KAFKA-17064)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: add ReconfigurableQuorumIntegrationTest [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on code in PR #16991:
URL: https://github.com/apache/kafka/pull/16991#discussion_r1738650234


##
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##
@@ -401,7 +409,7 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws 
Exception {
 Map directoryTypes = new HashMap<>();
 for (String emptyLogDir : ensemble.emptyLogDirs()) {
 DirectoryType directoryType = 
DirectoryType.calculate(emptyLogDir,
-metadataLogDirectory,
+metadataLogDirectory.orElseGet(() -> ""),

Review Comment:
   `orElse("")` is good enough :smile: 



##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -404,22 +423,44 @@ private void formatNode(
 boolean writeMetadataDirectory
 ) {
 try {
-MetaPropertiesEnsemble.Copier copier =
-new 
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
-for (Entry entry : 
ensemble.logDirProps().entrySet()) {
-String logDir = entry.getKey();
-if (writeMetadataDirectory || 
(!ensemble.metadataLogDir().equals(Optional.of(logDir {
-log.trace("Adding {} to the list of directories to 
format.", logDir);
-copier.setLogDirProps(logDir, entry.getValue());
+Formatter formatter = new Formatter();
+formatter.setNodeId(ensemble.nodeId().getAsInt());
+formatter.setClusterId(ensemble.clusterId().get());
+if (writeMetadataDirectory) {
+formatter.setDirectories(ensemble.logDirProps().keySet());
+} else {
+
formatter.setDirectories(ensemble.logDirProps().keySet().stream().
+filter(d -> !ensemble.metadataLogDir().get().equals(d)).
+collect(Collectors.toSet()));
+}
+if (formatter.directories().isEmpty()) {
+return;
+}
+
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
+formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
+
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
+formatter.setUnstableFeatureVersionsEnabled(true);
+formatter.setIgnoreFormatted(false);
+formatter.setControllerListenerName("CONTROLLER");
+if (writeMetadataDirectory) {
+
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
+} else {
+formatter.setMetadataLogDirectory(Optional.empty());

Review Comment:
   the default value is empty, so maybe we can remove it.



##
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java:
##
@@ -71,10 +72,50 @@ public void testFromRecordsListWithoutMetadataVersion() {
 () -> BootstrapMetadata.fromRecords(emptyList(), 
"quux")).getMessage());
 }
 
+private static final ApiMessageAndVersion MV_10 =
+new ApiMessageAndVersion(new FeatureLevelRecord().
+setName(FEATURE_NAME).
+setFeatureLevel((short) 10), (short) 0);
+
+private static final ApiMessageAndVersion MV_11 =

Review Comment:
   This is unused. Maybe we should add a UT for it?
   ```java
   assertEquals((short) 11, BootstrapMetadata.
   fromRecords(Arrays.asList(MV_10, MV_11), 
"src").featureLevel(FEATURE_NAME));
   ```



##
core/src/main/java/kafka/server/ServerSocketFactory.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.channels.ServerSocketChannel;
+
+public interface ServerSocketFactory {
+ServerSocketChannel openServerSocket(
+String listenerName,
+InetSocketAddress socketAddress,
+int listenBacklogSize,
+int recvBufferSize
+) thro

Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on code in PR #17057:
URL: https://github.com/apache/kafka/pull/17057#discussion_r1738713919


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -576,19 +576,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' 
protocol is not supported.")
 }
 if (protocols.contains(GroupType.CONSUMER)) {
-  if (processRoles.isEmpty) {
-throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance 
protocol is only supported in KRaft cluster.")
-  }
-  if (!isNewGroupCoordinatorEnabled) {
-throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance 
protocol is only supported by the new group coordinator.")
+  if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
+warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only 
supported in KRaft cluster with the new group coordinator.")

Review Comment:
   > it would log on every calls from the consumers
   
   you are right :100: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17449: Move Quota classes to server module [kafka]

2024-08-30 Thread via GitHub


mimaison opened a new pull request, #17060:
URL: https://github.com/apache/kafka/pull/17060

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17450: Reduced the handlers for handling ShareAcknowledgeResponse. [kafka]

2024-08-30 Thread via GitHub


ShivsundarR opened a new pull request, #17061:
URL: https://github.com/apache/kafka/pull/17061

   *What*
   Currently there are 4 handler functions present for handling 
ShareAcknowledge responses. ShareConsumeRequestManager had an interface and the 
respective handlers would implement it. Instead of having 4 different handlers 
for this, now using AcknowledgeRequestType, we could merge the code and have 
only 2 handler functions, one for ShareAcknowledge success and one for 
ShareAcknowledge failure, eliminating the need for the interface.
   
   This PR also fixes a bug - We were not using the time at which the response 
was received while handling the ShareAcknowledge response, we were using an 
outdated time. Now the bug is fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17347: Add missing client-metrics option to kafka-configs.sh [kafka]

2024-08-30 Thread via GitHub


AndrewJSchofield commented on PR #17046:
URL: https://github.com/apache/kafka/pull/17046#issuecomment-2321352952

   Only 3 unit test failures, unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17449) Move Quota classes to server-common module

2024-08-30 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-17449:
---
Summary: Move Quota classes to server-common module  (was: Move Quota 
classes to server module)

> Move Quota classes to server-common module
> --
>
> Key: KAFKA-17449
> URL: https://issues.apache.org/jira/browse/KAFKA-17449
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17449) Move Quota classes to server-common module

2024-08-30 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-17449:
---
Description: RLMQuotaManager which will utlimately move to storage depends 
on the QuotaType and QuotaUtils classes, so it makes sense to put them in 
server-common instead of server.

> Move Quota classes to server-common module
> --
>
> Key: KAFKA-17449
> URL: https://issues.apache.org/jira/browse/KAFKA-17449
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> RLMQuotaManager which will utlimately move to storage depends on the 
> QuotaType and QuotaUtils classes, so it makes sense to put them in 
> server-common instead of server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17386: Remove broker-list, threads, num-fetch-threads in ConsumerPerformance [kafka]

2024-08-30 Thread via GitHub


chia7712 merged PR #16983:
URL: https://github.com/apache/kafka/pull/16983


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17386) Remove broker-list, threads, num-fetch-threads in ConsumerPerformance

2024-08-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17386.

Fix Version/s: 4.0.0
   Resolution: Fixed

> Remove broker-list, threads, num-fetch-threads in ConsumerPerformance
> -
>
> Key: KAFKA-17386
> URL: https://issues.apache.org/jira/browse/KAFKA-17386
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 4.0.0
>
>
> The 
> [broker-list|https://github.com/apache/kafka/blob/944c1353a925858ea9bd9024a713cd7301f55133/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java#L268C45-L271],
>  
> [threads|https://github.com/apache/kafka/blob/944c1353a925858ea9bd9024a713cd7301f55133/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java#L298-L302],
>  
> [num-fetch-threads|https://github.com/apache/kafka/blob/944c1353a925858ea9bd9024a713cd7301f55133/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java#L303-L307]
>  are deprecated options in ConsumerPerformance. We can consider to remove 
> them in 4.0.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-499+-+Unify+connection+name+flag+for+command+line+tool
> https://issues.apache.org/jira/browse/KAFKA-10126



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17450: Reduced the handlers for handling ShareAcknowledgeResponse. [kafka]

2024-08-30 Thread via GitHub


AndrewJSchofield commented on code in PR #17061:
URL: https://github.com/apache/kafka/pull/17061#discussion_r1738762496


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -613,74 +607,100 @@ private void handleShareAcknowledgeSuccess(Node 
fetchTarget,
ShareAcknowledgeRequestData 
requestData,
AcknowledgeRequestState 
acknowledgeRequestState,
ClientResponse resp,
-   long currentTimeMs) {
+   long responseCompletionTimeMs) {
 try {
+//acknowledgeRequestState.handleShareAcknowledgeSuccess(resp);

Review Comment:
   This comment seems spurious.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16863 : Deprecate default exception handlers [kafka]

2024-08-30 Thread via GitHub


muralibasani commented on PR #17005:
URL: https://github.com/apache/kafka/pull/17005#issuecomment-2321425276

   > Committed the fix.
   > 
   > LGTM now, assuming build passes...
   
   Great, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16863 : Deprecate default exception handlers [kafka]

2024-08-30 Thread via GitHub


muralibasani commented on PR #17005:
URL: https://github.com/apache/kafka/pull/17005#issuecomment-2321453957

   there are failing tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17137[part-5]: Ensure Admin APIs are properly tested [kafka]

2024-08-30 Thread via GitHub


m1a2st commented on code in PR #16905:
URL: https://github.com/apache/kafka/pull/16905#discussion_r1738862139


##
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##
@@ -337,6 +338,53 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
 assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), 
classOf[InvalidRequestException])
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = {
+client = createAdminClient
+val timeout = Long.MaxValue
+
+val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = client.createDelegationToken(options).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDelegationTokenWithNegativeTimeout(quorum: String): Unit = {
+client = createAdminClient
+val timeout = -1
+
+val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = client.createDelegationToken(options).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = {
+client = createAdminClient
+val timeout = -1
+
+val createOptions = new 
CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = 
client.createDelegationToken(createOptions).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)

Review Comment:
   ditto



##
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##
@@ -337,6 +338,53 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
 assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), 
classOf[InvalidRequestException])
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = {
+client = createAdminClient
+val timeout = Long.MaxValue
+
+val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = client.createDelegationToken(options).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDelegationTokenWithNegativeTimeout(quorum: String): Unit = {
+client = createAdminClient
+val timeout = -1
+
+val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = client.createDelegationToken(options).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)

Review Comment:
   ditto



##
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##
@@ -337,6 +338,53 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
 assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), 
classOf[InvalidRequestException])
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = {
+client = createAdminClient
+val timeout = Long.MaxValue
+
+val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = client.createDelegationToken(options).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)

Review Comment:
   nit: the `token.tokenInfo` have been used four times, I consider make it a 
variable is more better, WDTY
   ```
   val tokenInfo = 
client.createDelegationToken(options).delegationToken(

Re: [PR] KAFKA-17449: Move Quota classes to server-common module [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on code in PR #17060:
URL: https://github.com/apache/kafka/pull/17060#discussion_r1738763455


##
server-common/src/main/java/org/apache/kafka/server/quota/QuotaUtils.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.stats.Rate;
+
+/**
+ * Helper functions related to quotas
+ */
+public class QuotaUtils {
+
+/**
+ * This calculates the amount of time needed to bring the observed rate 
within quota
+ * assuming that no new metrics are recorded.
+ * 
+ * If O is the observed rate and T is the target rate over a window of W, 
to bring O down to T,
+ * we need to add a delay of X to W such that O * W / (W + X) = T.
+ * Solving for X, we get X = (O - T)/T * W.
+ *
+ * @param timeMs current time in milliseconds
+ * @return Delay in milliseconds
+ */
+public static long throttleTime(QuotaViolationException e, long timeMs) {
+double difference = e.value() - e.bound();
+// Use the precise window used by the rate calculation
+double throttleTimeMs = difference / e.bound() * 
windowSize(e.metric(), timeMs);
+return Math.round(throttleTimeMs);
+}
+
+/**
+ * Calculates the amount of time needed to bring the observed rate within 
quota using the same algorithm as
+ * throttleTime() utility method but the returned value is capped to given 
maxThrottleTime
+ */
+public static long boundedThrottleTime(QuotaViolationException e, long 
maxThrottleTime, long timeMs) {
+return Math.min(throttleTime(e, timeMs), maxThrottleTime);
+}
+
+/**
+ * Returns window size of the given metric
+ *
+ * @param metric metric with measurable of type Rate
+ * @param timeMs current time in milliseconds
+ * @throws IllegalArgumentException if given measurable is not Rate
+ */
+private static long windowSize(KafkaMetric metric, long timeMs) {
+return measurableAsRate(metric.metricName(), 
metric.measurable()).windowSize(metric.config(), timeMs);
+}
+
+/**
+ * Casts provided Measurable to Rate
+ * @throws IllegalArgumentException if given measurable is not Rate
+ */
+private static Rate measurableAsRate(MetricName name, Measurable 
measurable) {
+if (measurable instanceof Rate) {
+return (Rate) measurable;
+} else {
+throw new IllegalArgumentException("Metric $name is not a Rate 
metric, value " + measurable);

Review Comment:
   `$name` does not work in java code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add concurrent test for consumer.poll [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on code in PR #17047:
URL: https://github.com/apache/kafka/pull/17047#discussion_r1738898118


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -3382,7 +3382,43 @@ public void 
testCommittedThrowsTimeoutExceptionForNoResponse(GroupProtocol group
 assertEquals("Timeout of 1000ms expired before the last committed 
offset for partitions [test-0] could be determined. " +
 "Try tuning default.api.timeout.ms larger to relax the 
threshold.", timeoutException.getMessage());
 }
-
+
+@ParameterizedTest
+@EnumSource(value = GroupProtocol.class)
+public void testPreventMultiThread(GroupProtocol groupProtocol) throws 
InterruptedException {

Review Comment:
   Could you please consider writing it by UT. 
https://github.com/apache/kafka/blob/4a3ab89f95aba294bb536af55548522d946d1ee3/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2428
 is a good example



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17374: add bootstrap.controller to kafka-reassign-partitions.sh [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on PR #16964:
URL: https://github.com/apache/kafka/pull/16964#issuecomment-2321570487

   @m1a2st Could you please rebase code to include #16644?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17449: Move Quota classes to server-common module [kafka]

2024-08-30 Thread via GitHub


mimaison commented on code in PR #17060:
URL: https://github.com/apache/kafka/pull/17060#discussion_r1738925546


##
server-common/src/main/java/org/apache/kafka/server/quota/QuotaUtils.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.stats.Rate;
+
+/**
+ * Helper functions related to quotas
+ */
+public class QuotaUtils {
+
+/**
+ * This calculates the amount of time needed to bring the observed rate 
within quota
+ * assuming that no new metrics are recorded.
+ * 
+ * If O is the observed rate and T is the target rate over a window of W, 
to bring O down to T,
+ * we need to add a delay of X to W such that O * W / (W + X) = T.
+ * Solving for X, we get X = (O - T)/T * W.
+ *
+ * @param timeMs current time in milliseconds
+ * @return Delay in milliseconds
+ */
+public static long throttleTime(QuotaViolationException e, long timeMs) {
+double difference = e.value() - e.bound();
+// Use the precise window used by the rate calculation
+double throttleTimeMs = difference / e.bound() * 
windowSize(e.metric(), timeMs);
+return Math.round(throttleTimeMs);
+}
+
+/**
+ * Calculates the amount of time needed to bring the observed rate within 
quota using the same algorithm as
+ * throttleTime() utility method but the returned value is capped to given 
maxThrottleTime
+ */
+public static long boundedThrottleTime(QuotaViolationException e, long 
maxThrottleTime, long timeMs) {
+return Math.min(throttleTime(e, timeMs), maxThrottleTime);
+}
+
+/**
+ * Returns window size of the given metric
+ *
+ * @param metric metric with measurable of type Rate
+ * @param timeMs current time in milliseconds
+ * @throws IllegalArgumentException if given measurable is not Rate
+ */
+private static long windowSize(KafkaMetric metric, long timeMs) {
+return measurableAsRate(metric.metricName(), 
metric.measurable()).windowSize(metric.config(), timeMs);
+}
+
+/**
+ * Casts provided Measurable to Rate
+ * @throws IllegalArgumentException if given measurable is not Rate
+ */
+private static Rate measurableAsRate(MetricName name, Measurable 
measurable) {
+if (measurable instanceof Rate) {
+return (Rate) measurable;
+} else {
+throw new IllegalArgumentException("Metric $name is not a Rate 
metric, value " + measurable);

Review Comment:
   Oops, I missed this one!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15909: Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer [kafka]

2024-08-30 Thread via GitHub


chia7712 merged PR #16933:
URL: https://github.com/apache/kafka/pull/16933


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15909) Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer

2024-08-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-15909.

Resolution: Fixed

> Throw error when consumer configured with empty/whitespace-only group.id for 
> LegacyKafkaConsumer
> 
>
> Key: KAFKA-15909
> URL: https://issues.apache.org/jira/browse/KAFKA-15909
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: PoAn Yang
>Priority: Major
> Fix For: 4.0.0
>
>
> Per 
> [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer],
>  the use of an empty value for {{group.id}} configuration was deprecated back 
> in 2.2.0.
> In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see 
> KAFKA-14438).
> This task is to update the {{LegacyKafkaConsumer}} implementation to throw an 
> error in 4.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17428) remote segments deleted in RLMCopyTask stays DELETE_SEGMENT_STARTED state

2024-08-30 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-17428:

Summary: remote segments deleted in RLMCopyTask stays 
DELETE_SEGMENT_STARTED state  (was: remote segments deleted in RLMCopyTask 
stays `COPY_SEGMENT_START` state)

> remote segments deleted in RLMCopyTask stays DELETE_SEGMENT_STARTED state
> -
>
> Key: KAFKA-17428
> URL: https://issues.apache.org/jira/browse/KAFKA-17428
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>  Labels: kip-required
>
> Currently, we will delete failed uploaded segment and Custom metadata size 
> exceeded segments in copyLogSegment in RLMCopyTask. But after deletion, these 
> segment states are still in COPY_SEGMENT_STARTED. That "might" cause 
> unexpected issues in the future. We'd better to move the state from 
> {{COPY_SEGMENT_STARTED}} -> {{DELETE_SEGMENT_STARTED}} -> 
> {{DELETE_SEGMENT_FINISHED}}
>  
> updated:
> I thought about this when I first had a look at it and one thing that 
> bothered me is that {{DELETE_SEGMENT_STARTED}} means to me that we're now in 
> a state where we attempt deletion. However if the remote store is down and we 
> fail to copy and delete we will leave that segment in 
> {{DELETE_SEGMENT_STARTED}} and not attempt to delete it till the segment 
> itself breaches retention.ms/bytes.
> We can probably just make it clearer but that was my thought at the time.
> So, maybe when in deletion loop, we can add {{DELETE_SEGMENT_STARTED}} 
> segments into deletion directly, but that also needs to consider the 
> retention size calculation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17428) Remote segments stay in COPY_SEGMENT_STARTED state after RLMCopyTask fails to upload

2024-08-30 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-17428:

Summary: Remote segments stay in COPY_SEGMENT_STARTED state after 
RLMCopyTask fails to upload  (was: Remote segments stay in 
DELETE_SEGMENT_STARTED state after RLMCopyTask fails to upload)

> Remote segments stay in COPY_SEGMENT_STARTED state after RLMCopyTask fails to 
> upload
> 
>
> Key: KAFKA-17428
> URL: https://issues.apache.org/jira/browse/KAFKA-17428
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>  Labels: kip-required
>
> Currently, we will delete failed uploaded segment and Custom metadata size 
> exceeded segments in copyLogSegment in RLMCopyTask. But after deletion, these 
> segment states are still in COPY_SEGMENT_STARTED. That "might" cause 
> unexpected issues in the future. We'd better to move the state from 
> {{COPY_SEGMENT_STARTED}} -> {{DELETE_SEGMENT_STARTED}} -> 
> {{DELETE_SEGMENT_FINISHED}}
>  
> updated:
> I thought about this when I first had a look at it and one thing that 
> bothered me is that {{DELETE_SEGMENT_STARTED}} means to me that we're now in 
> a state where we attempt deletion. However if the remote store is down and we 
> fail to copy and delete we will leave that segment in 
> {{DELETE_SEGMENT_STARTED}} and not attempt to delete it till the segment 
> itself breaches retention.ms/bytes.
> We can probably just make it clearer but that was my thought at the time.
> So, maybe when in deletion loop, we can add {{DELETE_SEGMENT_STARTED}} 
> segments into deletion directly, but that also needs to consider the 
> retention size calculation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17428) Remote segments stay in DELETE_SEGMENT_STARTED state after RLMCopyTask fails to upload

2024-08-30 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-17428:

Summary: Remote segments stay in DELETE_SEGMENT_STARTED state after 
RLMCopyTask fails to upload  (was: remote segments deleted in RLMCopyTask stays 
DELETE_SEGMENT_STARTED state)

> Remote segments stay in DELETE_SEGMENT_STARTED state after RLMCopyTask fails 
> to upload
> --
>
> Key: KAFKA-17428
> URL: https://issues.apache.org/jira/browse/KAFKA-17428
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>  Labels: kip-required
>
> Currently, we will delete failed uploaded segment and Custom metadata size 
> exceeded segments in copyLogSegment in RLMCopyTask. But after deletion, these 
> segment states are still in COPY_SEGMENT_STARTED. That "might" cause 
> unexpected issues in the future. We'd better to move the state from 
> {{COPY_SEGMENT_STARTED}} -> {{DELETE_SEGMENT_STARTED}} -> 
> {{DELETE_SEGMENT_FINISHED}}
>  
> updated:
> I thought about this when I first had a look at it and one thing that 
> bothered me is that {{DELETE_SEGMENT_STARTED}} means to me that we're now in 
> a state where we attempt deletion. However if the remote store is down and we 
> fail to copy and delete we will leave that segment in 
> {{DELETE_SEGMENT_STARTED}} and not attempt to delete it till the segment 
> itself breaches retention.ms/bytes.
> We can probably just make it clearer but that was my thought at the time.
> So, maybe when in deletion loop, we can add {{DELETE_SEGMENT_STARTED}} 
> segments into deletion directly, but that also needs to consider the 
> retention size calculation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17451) Remove deprecated Consumer#committed

2024-08-30 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17451:
--

 Summary: Remove deprecated Consumer#committed
 Key: KAFKA-17451
 URL: https://issues.apache.org/jira/browse/KAFKA-17451
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


The APIs were deprecated by KAFKA-8880 which is back in 2.4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17449: Move Quota classes to server-common module [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on PR #17060:
URL: https://github.com/apache/kafka/pull/17060#issuecomment-2321633915

   @mimaison Could you please fix the conflicts :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Back-port KAFKA-16230 to 3.7 branch [kafka]

2024-08-30 Thread via GitHub


lianetm commented on PR #16951:
URL: https://github.com/apache/kafka/pull/16951#issuecomment-2321721217

   Hey @kirktrue , took a first look and overall it looks good. Is there a run 
of the system tests with this change? (agree that failures in PlainTextConsumer 
are unrelated to this PR). Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17449: Move Quota classes to server-common module [kafka]

2024-08-30 Thread via GitHub


mimaison commented on PR #17060:
URL: https://github.com/apache/kafka/pull/17060#issuecomment-2321731110

   Yup, just rebased. Let's wait for the CI to run again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17451) Remove deprecated Consumer#committed

2024-08-30 Thread Dmitry Werner (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878173#comment-17878173
 ] 

Dmitry Werner commented on KAFKA-17451:
---

[~chia7712] Hello, can I take it?

> Remove deprecated Consumer#committed
> 
>
> Key: KAFKA-17451
> URL: https://issues.apache.org/jira/browse/KAFKA-17451
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> The APIs were deprecated by KAFKA-8880 which is back in 2.4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17137: Feat admin client it acl configs [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on code in PR #16648:
URL: https://github.com/apache/kafka/pull/16648#discussion_r1739046762


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -96,6 +96,50 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 super.tearDown()
   }
 
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeConfigWithOptionTimeoutMs(quorum: String): Unit = {
+val config = createConfig
+config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+val brokenClient = Admin.create(config)
+
+try {
+  // Describe and broker
+  val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, 
brokers(1).config.brokerId.toString)
+  val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, 
brokers(2).config.brokerId.toString)
+  val configResources = Seq(brokerResource1, brokerResource2)
+
+  val exception = assertThrows(classOf[ExecutionException], () => {
+brokenClient.describeConfigs(configResources.asJava,new 
DescribeConfigsOptions().timeoutMs(0)).all().get()
+  })
+  assertInstanceOf(classOf[TimeoutException], exception.getCause)
+} finally brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterConfigsWithOptionTimeoutMs(quorum: String): Unit = {
+client = createAdminClient
+val config = createConfig
+config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+val brokenClient = Admin.create(config)
+
+try {
+  val alterLogLevelsEntries = Seq(
+new ConfigEntry("kafka.controller.KafkaController", 
LogLevelConfig.INFO_LOG_LEVEL)
+  ).asJavaCollection
+
+  val exception = assertThrows(classOf[ExecutionException], () => {
+brokenClient.alterConfigs(
+Map(brokerLoggerConfigResource -> new 
Config(alterLogLevelsEntries)).asJava,
+  new AlterConfigsOptions().timeoutMs(0)).all()

Review Comment:
   please call `.get()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17451) Remove deprecated Consumer#committed

2024-08-30 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878180#comment-17878180
 ] 

Chia-Ping Tsai commented on KAFKA-17451:


[~javakillah] oh, sorry that my team member is working on that already :)

> Remove deprecated Consumer#committed
> 
>
> Key: KAFKA-17451
> URL: https://issues.apache.org/jira/browse/KAFKA-17451
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> The APIs were deprecated by KAFKA-8880 which is back in 2.4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16863 : Deprecate default exception handlers [kafka]

2024-08-30 Thread via GitHub


mjsax commented on PR #17005:
URL: https://github.com/apache/kafka/pull/17005#issuecomment-2321839005

   Yeah. Looks related. Can you take a look and push a fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17452) Fix flaky QuorumControllerTest#testUncleanShutdownBroker

2024-08-30 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17452:
--

 Summary: Fix flaky QuorumControllerTest#testUncleanShutdownBroker
 Key: KAFKA-17452
 URL: https://issues.apache.org/jira/browse/KAFKA-17452
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai



{code:java}
Error
org.opentest4j.AssertionFailedError: PartitionRegistration(replicas=[3, 1, 2], 
directories=[AA, AA, 
AA], isr=[3], removingReplicas=[], addingReplicas=[], 
elr=[], lastKnownElr=[], leader=-1, leaderRecoveryState=RECOVERED, 
leaderEpoch=3, partitionEpoch=3) ==> expected: <1> but was: <0>
Stacktrace
org.opentest4j.AssertionFailedError: PartitionRegistration(replicas=[3, 1, 2], 
directories=[AA, AA, 
AA], isr=[3], removingReplicas=[], addingReplicas=[], 
elr=[], lastKnownElr=[], leader=-1, leaderRecoveryState=RECOVERED, 
leaderEpoch=3, partitionEpoch=3) ==> expected: <1> but was: <0>
at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at 
org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:563)
at 
org.apache.kafka.controller.QuorumControllerTest.testUncleanShutdownBroker(QuorumControllerTest.java:427)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at java.util.ArrayList.forEach(ArrayList.java:1259)
Suppressed: org.apache.kafka.server.fault.FaultHandlerException: 
fatalFaultHandler: exception while renouncing leadership: Attempt to resign 
from epoch 1 which is larger than the current epoch 0
at 
org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:752)
at 
org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1268)
at 
org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:545)
at 
org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:180)
at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:878)
at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:868)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:153)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:142)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Attempt to resign from 
epoch 1 which is larger than the current epoch 0
... 11 more
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17453) Fix flaky PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest

2024-08-30 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17453:
--

 Summary: Fix flaky 
PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest
 Key: KAFKA-17453
 URL: https://issues.apache.org/jira/browse/KAFKA-17453
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


{code:java}
Errororg.opentest4j.AssertionFailedError: Failed to observe commit callback 
before timeoutStacktraceorg.opentest4j.AssertionFailedError: Failed to observe 
commit callback before timeout   at 
kafka.api.AbstractConsumerTest.sendAndAwaitAsyncCommit(AbstractConsumerTest.scala:293)
   at 
kafka.api.AbstractConsumerTest.ensureNoRebalance(AbstractConsumerTest.scala:403)
 at 
kafka.api.BaseConsumerTest.testCoordinatorFailover(BaseConsumerTest.scala:110)  
 at java.lang.reflect.Method.invoke(Method.java:498) at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
 at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)  at 
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)   
 at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
 at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  
  at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
  at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)  
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
  at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)  
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)   
 at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
 at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  
  at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
  at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)  
  at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)  
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)   
 at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
 at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  
  at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
  at java.util.ArrayList.forEach(ArrayList.java:1259) at 
java.util.ArrayList.forEach(ArrayList.java:1259) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17453) Fix flaky PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest

2024-08-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-17453:
---
Description: 
{code:java}
Errorjava.util.NoSuchElementExceptionStacktracejava.util.NoSuchElementException 
at 
org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52)   
 at 
kafka.api.PlaintextConsumerFetchTest.testFetchOutOfRangeOffsetResetConfigLatest(PlaintextConsumerFetchTest.scala:104)
at java.lang.reflect.Method.invoke(Method.java:498) at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
 at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)  at 
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)   
 at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
 at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  
  at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
  at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)  
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
  at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)  
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)   
 at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
 at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  
  at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
  at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)  
  at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)  
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)   
 at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
 at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  
  at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
  at java.util.ArrayList.forEach(ArrayList.java:1259) at 
java.util.ArrayList.forEach(ArrayList.java:1259){code}

  was:
{code:java}
Errororg.opentest4j.AssertionFailedError: Failed to observe commit callback 
before timeoutStacktraceorg.opentest4j.AssertionFailedError: Failed to observe 
commit callback before timeout   at 
kafka.api.AbstractConsumerTest.sendAndAwaitAsyncCommit(AbstractConsumerTest.scala:293)
   at 
kafka.api.AbstractConsumerTest.ensureNoRebalance(AbstractConsumerTest.scala:403)
 at 
kafka.api.BaseConsumerTest.testCoordinatorFailover(BaseConsumerTest.scala:110)  
 at java.lang.reflect.Method.invoke(Method.java:498) at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at 
java.uti

Re: [PR] KAFKA-17331: Throw unsupported version exception if the server does NOT support EarliestLocalSpec and LatestTieredSpec [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on PR #16873:
URL: https://github.com/apache/kafka/pull/16873#issuecomment-2321891055

   The failed tests are flaky. the related jira are shown below.
   https://issues.apache.org/jira/browse/KAFKA-17265
   https://issues.apache.org/jira/browse/KAFKA-16174
   https://issues.apache.org/jira/browse/KAFKA-16993
   https://issues.apache.org/jira/browse/KAFKA-17364
   https://issues.apache.org/jira/browse/KAFKA-17452
   https://issues.apache.org/jira/browse/KAFKA-16601
   https://issues.apache.org/jira/browse/KAFKA-15103
   https://issues.apache.org/jira/browse/KAFKA-7648
   https://issues.apache.org/jira/browse/KAFKA-16024
   https://issues.apache.org/jira/browse/KAFKA-17453
   https://issues.apache.org/jira/browse/KAFKA-15146
   https://issues.apache.org/jira/browse/KAFKA-17395
   https://issues.apache.org/jira/browse/KAFKA-8115
   https://issues.apache.org/jira/browse/KAFKA-15529
   https://issues.apache.org/jira/browse/KAFKA-16634


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17331: Throw unsupported version exception if the server does NOT support EarliestLocalSpec and LatestTieredSpec [kafka]

2024-08-30 Thread via GitHub


chia7712 commented on PR #16873:
URL: https://github.com/apache/kafka/pull/16873#issuecomment-2321894140

   I will merge this PR to trunk and 3.9 tomorrow if no objection


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17331: Throw unsupported version exception if the server does NOT support EarliestLocalSpec and LatestTieredSpec [kafka]

2024-08-30 Thread via GitHub


junrao commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1739140459


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1104,35 +1104,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 
 val responseTopics = authorizedRequestInfo.map { topic =>
   val responsePartitions = topic.partitions.asScala.map { partition =>
-val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
-
-try {
-  val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-topicPartition = topicPartition,
-timestamp = partition.timestamp,
-maxNumOffsets = partition.maxNumOffsets,
-isFromConsumer = offsetRequest.replicaId == 
ListOffsetsRequest.CONSUMER_REPLICA_ID,
-fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {

Review Comment:
   Should we remove the code related to `EARLIEST_LOCAL_TIMESTAMP` in 
`UnifiedLog.legacyFetchOffsetsBefore`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]

2024-08-30 Thread via GitHub


lianetm commented on code in PR #16982:
URL: https://github.com/apache/kafka/pull/16982#discussion_r1739141821


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -1053,12 +1053,20 @@ public void 
testFetchStableOffsetThrowInCommitted(GroupProtocol groupProtocol) {
 assertThrows(UnsupportedVersionException.class, () -> 
setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0)));
 }
 
-// TODO: this test triggers a bug with the CONSUMER group protocol 
implementation.
-//   The bug will be investigated and fixed so this test can use both 
group protocols.
 @ParameterizedTest
-@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
-public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) {
-assertThrows(UnsupportedVersionException.class, () -> 
setupThrowableConsumer(groupProtocol).poll(Duration.ZERO));
+@EnumSource(GroupProtocol.class)
+public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) 
throws InterruptedException {
+setupThrowableConsumer(groupProtocol);
+TestUtils.waitForCondition(() -> {

Review Comment:
   this change still has me thinking. This test is about a single call to 
`poll(ZERO)`, that is expected to throw an exception, but interesting fact is 
that the exception is generated when building the request 
[here](https://github.com/apache/kafka/blob/70dd577286de31ef20dc4f198e95f9b9e4479b47/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java#L146)
 (it does not require the actual send or response). So I wonder if the async 
consumer should somehow ensure that when poll returns (even with low time), it 
has allowed for at least one run of the background thread runOnce?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-08-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878191#comment-17878191
 ] 

Lianet Magrans commented on KAFKA-16792:


Thinking more about this, I wonder if we could make things slightly better and 
more consistent with the classic consumer, if we ensure that consumer.poll 
allows for at least one run of the background thread before returning? With 
that we would guarantee that poll(lowTimeout) generates the requests it needs 
(just like the classic consumer does), even though it may not have the time to 
wait for them. Note that this would instantly align the bevahiour in tests like 
testFetchStableOffsetThrowInPoll I guess, see my comment 
[there|https://github.com/apache/kafka/pull/16982/files#r1739141821] . Maybe a 
PollEvent that would do nothing in the background other than complete, but 
would serve as a signal to the foreground indicating that there're been 1 full 
cycle in the background. (just rough ideas for now, this requires more 
thinking, thoughts?)

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testCurrentLag
>  - testFetchStableOffsetThrowInPoll
>  - testListOffsetShouldUpdateSubscriptions
>  - testPollReturnsRecords
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-08-30 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878220#comment-17878220
 ] 

Lucas Brutschy commented on KAFKA-16758:


Hey lianet, are you still planning to do this?

> Extend Consumer#close with option to leave the group or not
> ---
>
> Key: KAFKA-16758
> URL: https://issues.apache.org/jira/browse/KAFKA-16758
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the 
> full context.
> Essentially we would get rid of the "internal.leave.group.on.close" config 
> that is used as a backdoor by Kafka Streams right now to prevent closed 
> consumers from leaving the group, thus reducing unnecessary task movements 
> after a simple bounce. 
> This would be replaced by an actual public API that would allow the caller to 
> opt in or out to the LeaveGroup when close is called. This would be similar 
> to the KafkaStreams#close(CloseOptions) API, and in fact would be how that 
> API will be implemented (since it only works for static groups at the moment 
> as noted in KAFKA-16514 )
> This has several benefits over the current situation:
>  # It allows plain consumer apps to opt-out of leaving the group when closed, 
> which is currently not possible through any public API (only an internal 
> backdoor config)
>  # It enables the caller to dynamically select the appropriate action 
> depending on why the client is being closed – for example, you would not want 
> the consumer to leave the group during a simple restart, but would want it to 
> leave the group when shutting down the app or if scaling down the node. This 
> is not possible today, even with the internal config, since configs are 
> immutable
>  # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
> that the user's choice to leave the group during close will be respected for 
> non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-08-30 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878220#comment-17878220
 ] 

Lucas Brutschy edited comment on KAFKA-16758 at 8/30/24 6:19 PM:
-

Hey [~lianetm], are you still planning to do this?


was (Author: JIRAUSER302322):
Hey lianet, are you still planning to do this?

> Extend Consumer#close with option to leave the group or not
> ---
>
> Key: KAFKA-16758
> URL: https://issues.apache.org/jira/browse/KAFKA-16758
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the 
> full context.
> Essentially we would get rid of the "internal.leave.group.on.close" config 
> that is used as a backdoor by Kafka Streams right now to prevent closed 
> consumers from leaving the group, thus reducing unnecessary task movements 
> after a simple bounce. 
> This would be replaced by an actual public API that would allow the caller to 
> opt in or out to the LeaveGroup when close is called. This would be similar 
> to the KafkaStreams#close(CloseOptions) API, and in fact would be how that 
> API will be implemented (since it only works for static groups at the moment 
> as noted in KAFKA-16514 )
> This has several benefits over the current situation:
>  # It allows plain consumer apps to opt-out of leaving the group when closed, 
> which is currently not possible through any public API (only an internal 
> backdoor config)
>  # It enables the caller to dynamically select the appropriate action 
> depending on why the client is being closed – for example, you would not want 
> the consumer to leave the group during a simple restart, but would want it to 
> leave the group when shutting down the app or if scaling down the node. This 
> is not possible today, even with the internal config, since configs are 
> immutable
>  # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
> that the user's choice to leave the group during close will be respected for 
> non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests which cannot be completed because of reaching record lock partition limit [kafka]

2024-08-30 Thread via GitHub


junrao commented on code in PR #16969:
URL: https://github.com/apache/kafka/pull/16969#discussion_r1739168653


##
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.DelayedOperation;
+import kafka.server.LogReadResult;
+import kafka.server.QuotaFactory;
+import kafka.server.ReplicaManager;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.storage.internals.log.FetchPartitionData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.Seq;
+import scala.jdk.javaapi.CollectionConverters;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A delayed share fetch operation has been introduced in case there is no 
share partition for which we can acquire records. We will try to wait
+ * for MaxWaitMs for records to be released else complete the share fetch 
request.
+ */
+public class DelayedShareFetch extends DelayedOperation {
+private final SharePartitionManager.ShareFetchPartitionData 
shareFetchPartitionData;
+private final ReplicaManager replicaManager;
+private final Map 
partitionCacheMap;
+private final Map 
topicPartitionDataFromTryComplete = new LinkedHashMap<>();
+
+private static final Logger log = 
LoggerFactory.getLogger(DelayedShareFetch.class);
+
+DelayedShareFetch(
+SharePartitionManager.ShareFetchPartitionData 
shareFetchPartitionData,
+ReplicaManager replicaManager,
+Map 
partitionCacheMap) {
+super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty());
+this.shareFetchPartitionData = shareFetchPartitionData;
+this.replicaManager = replicaManager;
+this.partitionCacheMap = partitionCacheMap;
+}
+
+@Override
+public void onExpiration() {
+}
+
+/**
+ * Complete the share fetch operation by fetching records for all 
partitions in the share fetch request irrespective
+ * of whether they have any acquired records. This is called when the 
fetch operation is forced to complete either

Review Comment:
   > irrespective of whether they have any acquired records.
   
   This seems outdated?



##
server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java:
##
@@ -63,14 +63,19 @@ public class ShareGroupConfig {
 public static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT = 
6;
 public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC = 
"The record acquisition lock maximum duration in milliseconds for share 
groups.";
 
+public static final String 
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG = 
"share.fetch.purgatory.purge.interval.requests";

Review Comment:
   This config is not in the KIP. So we need to update the KIP.



##
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##
@@ -564,16 +592,29 @@ void maybeProcessFetchQueue() {
 );
 } else {
 sharePartition.releaseFetchLock();
-log.info("Record lock partition limit exceeded for 
SharePartition with key {}, " +
-"cannot acquire more records", sharePartitionKey);
 }
 }
 });
 
-if (topicPartitionData.isEmpty()) {
-// No locks for share partitions could be acquired, so we 
complete the request and
-// will re-fetch for the client in next poll.
+if (shareFetchPartitionData.partitionMaxBytes.isEmpty()) {

Review Comment:
   Should we move this check earlier?



##
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##
@@ -564,16 +592,29 @@ void maybeProcessFetchQueue() {

[jira] [Created] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2

2024-08-30 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17454:
--

 Summary: Fix failed transactions_mixed_versions_test.py when 
running with 3.2
 Key: KAFKA-17454
 URL: https://issues.apache.org/jira/browse/KAFKA-17454
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


3.2.3 release does not include KAFKA-14259, so it will produce exception (shown 
below) when log level is DEBUG. Hence, we should change the log level from 
DEBUG to INFO for 3.2.3

for example: add `self.kafka.log_level = "INFO"`

{code:java}
Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be cast 
to java.lang.Comparable
at 
java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)
at java.util.TimSort.sort(TimSort.java:220)
at java.util.Arrays.sort(Arrays.java:1512)
at 
java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at 
org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212)
at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886)
at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763)
at java.util.Formatter.format(Formatter.java:2520)
at java.util.Formatter.format(Formatter.java:2455)
at java.lang.String.format(String.java:2940)
at java.util.Optional.toString(Optional.java:346)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at java.util.AbstractMap.toString(AbstractMap.java:559)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at 
kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274)
at kafka.utils.Logging.debug(Logging.scala:62)
at kafka.utils.Logging.debug$(Logging.scala:62)
at 
kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37)
at 
kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274)
at 
kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
at java.lang.Thread.run(Thread.java:750)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] ISSUE-1014 Should fail in the queue [kafka-merge-queue-sandbox]

2024-08-30 Thread via GitHub


mumrah closed pull request #21: ISSUE-1014 Should fail in the queue
URL: https://github.com/apache/kafka-merge-queue-sandbox/pull/21


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update README.md [kafka-merge-queue-sandbox]

2024-08-30 Thread via GitHub


mumrah opened a new pull request, #43:
URL: https://github.com/apache/kafka-merge-queue-sandbox/pull/43

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2

2024-08-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-17454:
---
Fix Version/s: 4.0.0
   3.9.0

> Fix failed transactions_mixed_versions_test.py when running with 3.2
> 
>
> Key: KAFKA-17454
> URL: https://issues.apache.org/jira/browse/KAFKA-17454
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 4.0.0, 3.9.0
>
>
> 3.2.3 release does not include KAFKA-14259, so it will produce exception 
> (shown below) when log level is DEBUG. Hence, we should change the log level 
> from DEBUG to INFO for 3.2.3
> for example: add `self.kafka.log_level = "INFO"`
> {code:java}
> Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be 
> cast to java.lang.Comparable
>   at 
> java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
>   at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)
>   at java.util.TimSort.sort(TimSort.java:220)
>   at java.util.Arrays.sort(Arrays.java:1512)
>   at 
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>   at 
> org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212)
>   at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886)
>   at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763)
>   at java.util.Formatter.format(Formatter.java:2520)
>   at java.util.Formatter.format(Formatter.java:2455)
>   at java.lang.String.format(String.java:2940)
>   at java.util.Optional.toString(Optional.java:346)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at java.util.AbstractMap.toString(AbstractMap.java:559)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at 
> kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274)
>   at kafka.utils.Logging.debug(Logging.scala:62)
>   at kafka.utils.Logging.debug$(Logging.scala:62)
>   at 
> kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37)
>   at 
> kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274)
>   at 
> kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>   at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2

2024-08-30 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878223#comment-17878223
 ] 

Chia-Ping Tsai commented on KAFKA-17454:


noted: the error happens only if the size of supported features is bigger than 
1 which trigger the "sort". That means the error happens when the 
`kraft.version` gets in, since there is already a feature `metadata.version`

> Fix failed transactions_mixed_versions_test.py when running with 3.2
> 
>
> Key: KAFKA-17454
> URL: https://issues.apache.org/jira/browse/KAFKA-17454
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> 3.2.3 release does not include KAFKA-14259, so it will produce exception 
> (shown below) when log level is DEBUG. Hence, we should change the log level 
> from DEBUG to INFO for 3.2.3
> for example: add `self.kafka.log_level = "INFO"`
> {code:java}
> Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be 
> cast to java.lang.Comparable
>   at 
> java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
>   at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)
>   at java.util.TimSort.sort(TimSort.java:220)
>   at java.util.Arrays.sort(Arrays.java:1512)
>   at 
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>   at 
> org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212)
>   at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886)
>   at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763)
>   at java.util.Formatter.format(Formatter.java:2520)
>   at java.util.Formatter.format(Formatter.java:2455)
>   at java.lang.String.format(String.java:2940)
>   at java.util.Optional.toString(Optional.java:346)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at java.util.AbstractMap.toString(AbstractMap.java:559)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at 
> kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274)
>   at kafka.utils.Logging.debug(Logging.scala:62)
>   at kafka.utils.Logging.debug$(Logging.scala:62)
>   at 
> kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37)
>   at 
> kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274)
>   at 
> kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>   at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2

2024-08-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-17454:
---
Priority: Blocker  (was: Major)

> Fix failed transactions_mixed_versions_test.py when running with 3.2
> 
>
> Key: KAFKA-17454
> URL: https://issues.apache.org/jira/browse/KAFKA-17454
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 4.0.0, 3.9.0
>
>
> 3.2.3 release does not include KAFKA-14259, so it will produce exception 
> (shown below) when log level is DEBUG. Hence, we should change the log level 
> from DEBUG to INFO for 3.2.3
> for example: add `self.kafka.log_level = "INFO"`
> {code:java}
> Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be 
> cast to java.lang.Comparable
>   at 
> java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
>   at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)
>   at java.util.TimSort.sort(TimSort.java:220)
>   at java.util.Arrays.sort(Arrays.java:1512)
>   at 
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>   at 
> org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212)
>   at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886)
>   at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763)
>   at java.util.Formatter.format(Formatter.java:2520)
>   at java.util.Formatter.format(Formatter.java:2455)
>   at java.lang.String.format(String.java:2940)
>   at java.util.Optional.toString(Optional.java:346)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at java.util.AbstractMap.toString(AbstractMap.java:559)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at 
> kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274)
>   at kafka.utils.Logging.debug(Logging.scala:62)
>   at kafka.utils.Logging.debug$(Logging.scala:62)
>   at 
> kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37)
>   at 
> kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274)
>   at 
> kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>   at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Update README.md [kafka-merge-queue-sandbox]

2024-08-30 Thread via GitHub


mumrah merged PR #43:
URL: https://github.com/apache/kafka-merge-queue-sandbox/pull/43


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2

2024-08-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-17454:
---
Fix Version/s: (was: 3.9.0)

> Fix failed transactions_mixed_versions_test.py when running with 3.2
> 
>
> Key: KAFKA-17454
> URL: https://issues.apache.org/jira/browse/KAFKA-17454
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 4.0.0
>
>
> 3.2.3 release does not include KAFKA-14259, so it will produce exception 
> (shown below) when log level is DEBUG. Hence, we should change the log level 
> from DEBUG to INFO for 3.2.3
> for example: add `self.kafka.log_level = "INFO"`
> {code:java}
> Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be 
> cast to java.lang.Comparable
>   at 
> java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
>   at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)
>   at java.util.TimSort.sort(TimSort.java:220)
>   at java.util.Arrays.sort(Arrays.java:1512)
>   at 
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>   at 
> org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212)
>   at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886)
>   at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763)
>   at java.util.Formatter.format(Formatter.java:2520)
>   at java.util.Formatter.format(Formatter.java:2455)
>   at java.lang.String.format(String.java:2940)
>   at java.util.Optional.toString(Optional.java:346)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at java.util.AbstractMap.toString(AbstractMap.java:559)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at 
> kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274)
>   at kafka.utils.Logging.debug(Logging.scala:62)
>   at kafka.utils.Logging.debug$(Logging.scala:62)
>   at 
> kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37)
>   at 
> kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274)
>   at 
> kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>   at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2

2024-08-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-17454:
---
Fix Version/s: 3.9.0

> Fix failed transactions_mixed_versions_test.py when running with 3.2
> 
>
> Key: KAFKA-17454
> URL: https://issues.apache.org/jira/browse/KAFKA-17454
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 4.0.0, 3.9.0
>
>
> 3.2.3 release does not include KAFKA-14259, so it will produce exception 
> (shown below) when log level is DEBUG. Hence, we should change the log level 
> from DEBUG to INFO for 3.2.3
> for example: add `self.kafka.log_level = "INFO"`
> {code:java}
> Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be 
> cast to java.lang.Comparable
>   at 
> java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
>   at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)
>   at java.util.TimSort.sort(TimSort.java:220)
>   at java.util.Arrays.sort(Arrays.java:1512)
>   at 
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>   at 
> org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212)
>   at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886)
>   at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763)
>   at java.util.Formatter.format(Formatter.java:2520)
>   at java.util.Formatter.format(Formatter.java:2455)
>   at java.lang.String.format(String.java:2940)
>   at java.util.Optional.toString(Optional.java:346)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at java.util.AbstractMap.toString(AbstractMap.java:559)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:136)
>   at 
> kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274)
>   at kafka.utils.Logging.debug(Logging.scala:62)
>   at kafka.utils.Logging.debug$(Logging.scala:62)
>   at 
> kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37)
>   at 
> kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274)
>   at 
> kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>   at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR Handle test re-runs in junit.py [kafka]

2024-08-30 Thread via GitHub


mumrah commented on code in PR #17034:
URL: https://github.com/apache/kafka/pull/17034#discussion_r1739299366


##
.github/scripts/junit.py:
##
@@ -148,29 +168,73 @@ def pretty_time_duration(seconds: float) -> str:
 total_failures += suite.failures
 total_errors += suite.errors
 total_time += suite.time
-for test_failure in suite.test_failures:
+
+# Due to how the Develocity Test Retry plugin interacts with 
our geneated ClusterTests, we can see
+# tests pass and then fail in the same run. Because of this, 
we need to capture all passed and all
+# failed for each suite. Then we can find flakes by taking the 
intersection of those two.
+all_suite_passed = {test.key() for test in suite.passed_tests}
+all_suite_failed = {test.key() for test in suite.failed_tests}
+flaky = all_suite_passed & all_suite_failed
+total_flaky += len(flaky)
+
+# Display failures first
+for test_failure in suite.failed_tests:
+if test_failure.key() in flaky:
+continue
 logger.debug(f"Found test failure: {test_failure}")
 simple_class_name = test_failure.class_name.split(".")[-1]
-table.append(("❌", simple_class_name, 
test_failure.test_name, test_failure.failure_message, 
f"{test_failure.time:0.2f}s"))
+failed_table.append((simple_class_name, 
test_failure.test_name, test_failure.failure_message, 
f"{test_failure.time:0.2f}s"))
+for test_failure in suite.failed_tests:
+if test_failure.key() not in flaky:
+continue
+logger.debug(f"Found flaky test: {test_failure}")
+simple_class_name = test_failure.class_name.split(".")[-1]
+flaky_table.append((simple_class_name, 
test_failure.test_name, test_failure.failure_message, 
f"{test_failure.time:0.2f}s"))
 for skipped_test in suite.skipped_tests:
 simple_class_name = skipped_test.class_name.split(".")[-1]
 logger.debug(f"Found skipped test: {skipped_test}")
-table.append(("⚠️", simple_class_name, 
skipped_test.test_name, "Skipped", ""))
+skipped_table.append((simple_class_name, 
skipped_test.test_name))
 duration = pretty_time_duration(total_time)
+logger.info(f"Finished processing {len(reports)} reports")
 
 # Print summary
 report_url = get_env("REPORT_URL")
 report_md = f"Download [HTML report]({report_url})."
-summary = f"{total_tests} tests run in {duration}, {total_failures} failed 
❌, {total_skipped} skipped ⚠️, {total_errors} errors."
-logger.debug(summary)
+summary = f"{total_tests} tests run in {duration}, {total_failures} 
{FAILED}, {total_flaky} {FLAKY}, {total_skipped} {SKIPPED}, and {total_errors} 
errors."

Review Comment:
   Thanks, I noticed this too. Latest commit should fix this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Testing 2 [kafka]

2024-08-30 Thread via GitHub


mumrah opened a new pull request, #17062:
URL: https://github.com/apache/kafka/pull/17062

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17442: Handled persister errors with write async calls (KIP-932) [kafka]

2024-08-30 Thread via GitHub


junrao commented on code in PR #16956:
URL: https://github.com/apache/kafka/pull/16956#discussion_r1739379496


##
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##
@@ -4191,11 +4337,11 @@ public void testAcknowledgeSubsetWithAnotherMember() {
 Collections.singletonList(new ShareAcknowledgementBatch(5, 7, 
Collections.singletonList((byte) 1;
 
 // Acknowledge subset with another member.
-CompletableFuture> ackResult = 
sharePartition.acknowledge("member-2",
+CompletableFuture ackResult = 
sharePartition.acknowledge("member-2",
 Collections.singletonList(new ShareAcknowledgementBatch(9, 11, 
Collections.singletonList((byte) 1;
-assertFalse(ackResult.isCompletedExceptionally());
-assertTrue(ackResult.join().isPresent());
-assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+//assertNull(ackResult.join());

Review Comment:
   Should we remove this code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests which cannot be completed because of reaching record lock partition limit [kafka]

2024-08-30 Thread via GitHub


junrao commented on code in PR #16969:
URL: https://github.com/apache/kafka/pull/16969#discussion_r1739387577


##
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##
@@ -498,10 +519,17 @@ List 
cachedTopicIdPartitionsInShareSession(String groupId, Uui
 return cachedTopicIdPartitions;
 }
 
+// Add the share fetch request to the delayed share fetch purgatory to 
process the fetch request if it can be
+// completed else watch until it can be completed/timeout.
+private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, 
Set keys) {
+
this.delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, 
CollectionConverters.asScala(keys).toSeq());

Review Comment:
   We want to be consistent with using the instance val. In other places, we 
don't use `this`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 8850: Updated documentation to clarify fetch.min.bytes behaviour. [kafka]

2024-08-30 Thread via GitHub


mjsax commented on PR #16749:
URL: https://github.com/apache/kafka/pull/16749#issuecomment-2322298335

   \cc @lianetm for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >