[jira] [Commented] (KAFKA-17395) Flaky test testMissingOffsetNoResetPolicy for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-17395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877989#comment-17877989 ] PoAn Yang commented on KAFKA-17395: --- Hi [~taijuwu], yeah, like you said, if {{CoordinatorRequestManager}} sends {{FindCoordinatorRequest}} before {{MockClient#prepareResponseFrom}}, the response has no chance to match the request. However, refactor {{MockClient}} is a big effort. I think we can just move {{MockClient#prepareResponseFrom}} before {{newConsumer}}, so there will be response in list before the request. I will file a PR later. Thanks. > Flaky test testMissingOffsetNoResetPolicy for new consumer > -- > > Key: KAFKA-17395 > URL: https://issues.apache.org/jira/browse/KAFKA-17395 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, flaky-test > > KafkaConsumerTest.testMissingOffsetNoResetPolicy is flaky when running for > the new consumer (passing consistently for the classic consumer). > Fails with : > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Consumer was not able to update fetch positions on continuous calls with 0 > timeout ==> expected: but was: > It's been flaky since it was enabled for the new consumer with > [https://github.com/apache/kafka/pull/16587] > See last couple of month runs on trunk showing the flakiness: > [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172429919&search.startTimeMin=171721440&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testMissingOffsetNoResetPolicy(GroupProtocol)%5B2%5D] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17395) Flaky test testMissingOffsetNoResetPolicy for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-17395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PoAn Yang reassigned KAFKA-17395: - Assignee: PoAn Yang > Flaky test testMissingOffsetNoResetPolicy for new consumer > -- > > Key: KAFKA-17395 > URL: https://issues.apache.org/jira/browse/KAFKA-17395 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor, flaky-test > > KafkaConsumerTest.testMissingOffsetNoResetPolicy is flaky when running for > the new consumer (passing consistently for the classic consumer). > Fails with : > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Consumer was not able to update fetch positions on continuous calls with 0 > timeout ==> expected: but was: > It's been flaky since it was enabled for the new consumer with > [https://github.com/apache/kafka/pull/16587] > See last couple of month runs on trunk showing the flakiness: > [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172429919&search.startTimeMin=171721440&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testMissingOffsetNoResetPolicy(GroupProtocol)%5B2%5D] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka 8850: Updated documentation to clarify fetch.min.bytes behaviour. [kafka]
abhi-ksolves commented on PR #16749: URL: https://github.com/apache/kafka/pull/16749#issuecomment-2320311037 Hi @mjsax, Can you review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17041) Add pagination when describe large set of metadata via Admin API
[ https://issues.apache.org/jira/browse/KAFKA-17041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877992#comment-17877992 ] Lin Siyuan commented on KAFKA-17041: hi [~omnia_h_ibrahim] , I have been following this KIP for a long time, if it passes the vote, I hope to be able to participate in the transformation of one of the interfaces, thank you. > Add pagination when describe large set of metadata via Admin API > - > > Key: KAFKA-17041 > URL: https://issues.apache.org/jira/browse/KAFKA-17041 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Omnia Ibrahim >Assignee: Omnia Ibrahim >Priority: Major > Attachments: image-2024-08-01-14-08-00-999.png, > image-2024-08-19-17-32-08-015.png > > > Some of the request via Admin API timeout on large cluster or cluster with > large set of specific metadata. For example OffsetFetchRequest and > DescribeLogDirsRequest timeout due to large number of partition on cluster. > Also DescribeProducersRequest and ListTransactionsRequest time out due to too > many short lived PID or too many hanging transactions > [KIP-1062: Introduce Pagination for some requests used by Admin > API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1062%3A+Introduce+Pagination+for+some+requests+used+by+Admin+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17445) Kafka streams keeps rebalancing with the following reasons
[ https://issues.apache.org/jira/browse/KAFKA-17445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878006#comment-17878006 ] Bruno Cadonna commented on KAFKA-17445: --- [~rohitbobade] It is not clear to me what you tried to achieve by setting {{group.instance.id}}. Could yo please elaborate? Did you increase {{session.timeout.ms}} as described in the config definition (https://kafka.apache.org/documentation/#consumerconfigs_group.instance.id) Could you describe the exact steps? Did you delete the consumer group on the broker between the attempts? Was this a new Streams app or an existing one? > Kafka streams keeps rebalancing with the following reasons > -- > > Key: KAFKA-17445 > URL: https://issues.apache.org/jira/browse/KAFKA-17445 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Rohit Bobade >Priority: Major > > We recently upgraded Kafka streams version to 3.8.0 and are seeing that the > streams app keeps rebalancing and does not process any events > We have explicitly set the config > GROUP_INSTANCE_ID_CONFIG > This is what we see on the broker logs: > [GroupCoordinator 2]: Preparing to rebalance group \{consumer-group-name} in > state PreparingRebalance with old generation 24781 (__consumer_offsets-29) > (reason: Updating metadata for static member {} with instance id {}; client > reason: rebalance failed due to UnjoinedGroupException) > We also tried to remove the GROUP_INSTANCE_ID_CONFIG but then see these logs > and rebalancing and no processing still > sessionTimeoutMs=45000, rebalanceTimeoutMs=180, > supportedProtocols=List(stream)) has left group \{groupId} through explicit > `LeaveGroup`; client reason: the consumer unsubscribed from all topics > (kafka.coordinator.group.GroupCoordinator) > other logs show: > during Stable; client reason: need to revoke partitions and re-join) > client reason: triggered followup rebalance scheduled for 0 > On the application logs we see: > 1. state being restored from changelog topic > 2. INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread at state RUNNING: partitions lost due to missed rebalance. > Detected that the thread is being fenced. This implies that this thread > missed a rebalance and dropped out of the consumer group. Will close out all > assigned tasks and rejoin the consumer group. > > 3. Task Migrated exceptions > org.apache.kafka.streams.errors.TaskMigratedException: Error encountered > sending record to topic > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer with > transactionalId > attempted to produce with an old epoch > Written offsets would not be recorded and no more records would be sent since > the producer is fenced, indicating the task may be migrated out; it means all > tasks belonging to this thread should be migrated. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:306) > ~[kafka-streams-3.8.0.jar:?] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:286) > ~[kafka-streams-3.8.0.jar:?] > at > datadog.trace.instrumentation.kafka_clients.KafkaProducerCallback.onCompletion(KafkaProducerCallback.java:44) > ~[?:?] > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1106) > ~[kafka-clients-3.8.0.jar:?] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17395) Flaky test testMissingOffsetNoResetPolicy for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-17395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878014#comment-17878014 ] TaiJuWu commented on KAFKA-17395: - Hi [~yangpoan] , You always have better solution:) > Flaky test testMissingOffsetNoResetPolicy for new consumer > -- > > Key: KAFKA-17395 > URL: https://issues.apache.org/jira/browse/KAFKA-17395 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor, flaky-test > > KafkaConsumerTest.testMissingOffsetNoResetPolicy is flaky when running for > the new consumer (passing consistently for the classic consumer). > Fails with : > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Consumer was not able to update fetch positions on continuous calls with 0 > timeout ==> expected: but was: > It's been flaky since it was enabled for the new consumer with > [https://github.com/apache/kafka/pull/16587] > See last couple of month runs on trunk showing the flakiness: > [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172429919&search.startTimeMin=171721440&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testMissingOffsetNoResetPolicy(GroupProtocol)%5B2%5D] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16448 Add timestamp to error handler context [kafka]
sebastienviale opened a new pull request, #17054: URL: https://github.com/apache/kafka/pull/17054 This PR is part of [KAFKA-16448](https://issues.apache.org/jira/browse/KAFKA-16448) which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR add the timestamp to the ErrorHandlerContext interface Jira: https://issues.apache.org/jira/browse/KAFKA-16448. Contributors @Dabz @sebastienviale @loicgreffier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Changed fetch queue processing to reduce the no. of locking and unlocking activity [kafka]
adixitconfluent opened a new pull request, #17055: URL: https://github.com/apache/kafka/pull/17055 ### About For the share groups fetch request processing, we have an recursive approach of dealing with individual fetch requests. While it works fine with less no. of records (< 1,000,000) and lesser sharing (< 5 share consumers), it seems that some requests are getting stuck when we increase the load and try to increase the throughput. I've replaced this approach by removing the unlocking and locking of fetch queue in between entries. This had reduced the complexity and also removes the reliability issue on increasing the load. ### Testing The code has been tested with the help of unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878022#comment-17878022 ] Chia-Ping Tsai commented on KAFKA-16792: AsyncConsumer close(0) has similar issue that it needs to wait network thread to trigger something. Hence, the “small” timeout will make the call (poll/close) leave before network thread complete the event. In summary, poll(0) could not trigger FC request when return and close(0) could not trigger rebalance listener. Both are inconsistent behavior to classic consumer. We should document the new behavior if AsyncConsumer does want to honor the timeout. > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Blocker > Labels: kip-848-client-support > Fix For: 4.0.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testCurrentLag > - testFetchStableOffsetThrowInPoll > - testListOffsetShouldUpdateSubscriptions > - testPollReturnsRecords > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17424: Memory optimisation for Kafka-connect [kafka]
ajit97singh closed pull request #17002: KAFKA-17424: Memory optimisation for Kafka-connect URL: https://github.com/apache/kafka/pull/17002 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14572: Migrate EmbeddedKafkaCluster used by Streams integration tests from EmbeddedZookeeper to KRaft [kafka]
lucasbru commented on PR #17016: URL: https://github.com/apache/kafka/pull/17016#issuecomment-2320694184 @OmniaGM Looks like something in kraft is just taking longer? Have we not observed this problem with the connect unit tests? I wonder if there are some settings that we could use to speed things up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17447) Changed fetch queue processing to reduce the no. of locking and unlocking activity
Abhinav Dixit created KAFKA-17447: - Summary: Changed fetch queue processing to reduce the no. of locking and unlocking activity Key: KAFKA-17447 URL: https://issues.apache.org/jira/browse/KAFKA-17447 Project: Kafka Issue Type: Sub-task Reporter: Abhinav Dixit Assignee: Abhinav Dixit For the share groups fetch request processing, we have an recursive approach of dealing with individual fetch requests. While it works fine with less no. of records (< 1,000,000) and lesser sharing (< 5 share consumers), it seems that some requests are getting stuck when we increase the load and try to increase the throughput. I've replaced this approach by removing the unlocking and locking of fetch queue in between entries. This had reduced the complexity and also removes the reliability issue on increasing the load. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16968) Make 3.8-IV0 a stable MetadataVersion and create 3.9-IV0
[ https://issues.apache.org/jira/browse/KAFKA-16968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TaiJuWu resolved KAFKA-16968. - Resolution: Fixed > Make 3.8-IV0 a stable MetadataVersion and create 3.9-IV0 > > > Key: KAFKA-16968 > URL: https://issues.apache.org/jira/browse/KAFKA-16968 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Blocker > > Before we can release 3.8, we must make 3.8-IV0 a stable MetadataVersion and > create 3.9-IV0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17446) Kafka streams stuck in rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-17446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878040#comment-17878040 ] Bruno Cadonna commented on KAFKA-17446: --- [~rohitbobade] Could you please share some more details? Logs preferably on {{DEBUG}} level would be great! > Kafka streams stuck in rebalancing > -- > > Key: KAFKA-17446 > URL: https://issues.apache.org/jira/browse/KAFKA-17446 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: Rohit Bobade >Priority: Major > > Kafka streams stuck in endless rebalancing with the following error: > org.apache.kafka.streams.errors.LockException: stream-thread task [0_1] > Failed to lock the state directory for task 0_1 > org.apache.kafka.streams.processor.internals.TaskManager - stream-thread > Encountered lock exception. Reattempting locking the state in the next > iteration. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17293: New consumer HeartbeatRequestManager should rediscover disconnected coordinator [kafka]
lianetm commented on code in PR #16844: URL: https://github.com/apache/kafka/pull/16844#discussion_r1738389019 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -122,6 +123,23 @@ NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long curren }); } +/** + * Handles the disconnection of the current coordinator. + * This method checks if the given exception is an instance of {@link DisconnectException}. + * If so, it marks the coordinator as unknown, indicating that the client should + * attempt to discover a new coordinator. For any other exception type, no action is performed. + * + * @param exception The exception to handle, which was received as part of a request response. + * If this is an instance of {@link DisconnectException}, the coordinator is marked as unknown. + * For other types of exceptions, no action is performed. Review Comment: nit: this is already said right above, so let's just remove it from here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17293: New consumer HeartbeatRequestManager should rediscover disconnected coordinator [kafka]
frankvicky commented on code in PR #16844: URL: https://github.com/apache/kafka/pull/16844#discussion_r1738393452 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -122,6 +123,23 @@ NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long curren }); } +/** + * Handles the disconnection of the current coordinator. + * This method checks if the given exception is an instance of {@link DisconnectException}. + * If so, it marks the coordinator as unknown, indicating that the client should + * attempt to discover a new coordinator. For any other exception type, no action is performed. + * + * @param exception The exception to handle, which was received as part of a request response. + * If this is an instance of {@link DisconnectException}, the coordinator is marked as unknown. + * For other types of exceptions, no action is performed. Review Comment: Yes, it's a little verbose. I will remove it 👍🏼 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878054#comment-17878054 ] Lianet Magrans commented on KAFKA-16792: Agree with [~chia7712] 's point, we've had this same situation and is expected in all these tests that basically make an api call (app thread), and check right after that there is a request generated. For the new consumer that does not happen right away: all request are generated in the background thread, once it's has had the time to runOnce and process the event triggered by the api call. So the waitForCondition seems sensible at this test level (they are really checking the internals, so need to deal with it). On the api implementations, we cover this delay and try to hide it as much as possible by blocking with addAndGet(event) to wait for the background to process the event/requests. > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Blocker > Labels: kip-848-client-support > Fix For: 4.0.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testCurrentLag > - testFetchStableOffsetThrowInPoll > - testListOffsetShouldUpdateSubscriptions > - testPollReturnsRecords > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17442: Handled persister errors with write async calls (KIP-932) [kafka]
apoorvmittal10 commented on PR #16956: URL: https://github.com/apache/kafka/pull/16956#issuecomment-2320893368 @junrao Can I please get review on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17442: Handled persister errors with write async calls (KIP-932) [kafka]
apoorvmittal10 commented on PR #16956: URL: https://github.com/apache/kafka/pull/16956#issuecomment-2320892038 > Thanks for the PR. I had a query, what will happen to the ongoing write state requests to the persister when you close a share partition. I understand that it is fine if some write state calls are not completed, but will cause any problems while close of `SharePartitionManager`? The broker must wait for the pending requests to be completed, so I think it should not be problem unless persister never replies back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17395: Flaky test testMissingOffsetNoResetPolicy for new consumer [kafka]
FrankYang0529 opened a new pull request, #17056: URL: https://github.com/apache/kafka/pull/17056 In `AsyncKafkaConsumer`, `FindCoordinatorRequest` is sent by background thread. In `MockClient#prepareResponseFrom`, it only matches the response to a future request. If there is some race condition, `FindCoordinatorResponse` may not match to a `FindCoordinatorRequest`. It's better to put `MockClient#prepareResponseFrom` before the request to avoid flaky test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17448) New consumer seek should update positions in background thread
Lianet Magrans created KAFKA-17448: -- Summary: New consumer seek should update positions in background thread Key: KAFKA-17448 URL: https://issues.apache.org/jira/browse/KAFKA-17448 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.1, 3.8.0, 3.7.0 Reporter: Lianet Magrans In the new AsyncKafkaConsumer, a call to seek will update the positions in subscription state for the assigned partitions in the app thread ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796]) This could lead to race conditions like we've seen when subscription state changes in the app thread (over a set of assigned partitions), that could have been modified in the background thread, leading to errors on "No current assignment for partition " [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378] Also, positions update is moved the background with KAFKA-17066 for the same reason, so even if the assignment does not change, we could have a race between the background setting positions to the committed offsets for instance, and the app thread setting them manually via seek. To avoid all of the above, we should have seek generate an event, send it to the background, and then update the subscription state when processing that event (similar to other api calls, ex, assign with KAFKA-17064) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17448) New consumer seek should update positions in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878065#comment-17878065 ] Lianet Magrans commented on KAFKA-17448: Hey [~payang] , this issue is very similar to KAFKA-17064 you just fixed. If you have bandwidth this would probably be very familiar to you already :). > New consumer seek should update positions in background thread > -- > > Key: KAFKA-17448 > URL: https://issues.apache.org/jira/browse/KAFKA-17448 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0, 3.8.0, 3.7.1 >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > In the new AsyncKafkaConsumer, a call to seek will update the positions in > subscription state for the assigned partitions in the app thread > ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796]) > This could lead to race conditions like we've seen when subscription state > changes in the app thread (over a set of assigned partitions), that could > have been modified in the background thread, leading to errors on "No current > assignment for partition " > [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378] > > Also, positions update is moved the background with KAFKA-17066 for the same > reason, so even if the assignment does not change, we could have a race > between the background setting positions to the committed offsets for > instance, and the app thread setting them manually via seek. > To avoid all of the above, we should have seek generate an event, send it to > the background, and then update the subscription state when processing that > event (similar to other api calls, ex, assign with KAFKA-17064) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17448) New consumer seek should update positions in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-17448: --- Fix Version/s: 4.0.0 > New consumer seek should update positions in background thread > -- > > Key: KAFKA-17448 > URL: https://issues.apache.org/jira/browse/KAFKA-17448 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0, 3.8.0, 3.7.1 >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > In the new AsyncKafkaConsumer, a call to seek will update the positions in > subscription state for the assigned partitions in the app thread > ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796]) > This could lead to race conditions like we've seen when subscription state > changes in the app thread (over a set of assigned partitions), that could > have been modified in the background thread, leading to errors on "No current > assignment for partition " > [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378] > > Also, positions update is moved the background with KAFKA-17066 for the same > reason, so even if the assignment does not change, we could have a race > between the background setting positions to the committed offsets for > instance, and the app thread setting them manually via seek. > To avoid all of the above, we should have seek generate an event, send it to > the background, and then update the subscription state when processing that > event (similar to other api calls, ex, assign with KAFKA-17064) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17448) New consumer seek should update positions in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-17448: --- Component/s: clients > New consumer seek should update positions in background thread > -- > > Key: KAFKA-17448 > URL: https://issues.apache.org/jira/browse/KAFKA-17448 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0, 3.8.0, 3.7.1 >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > In the new AsyncKafkaConsumer, a call to seek will update the positions in > subscription state for the assigned partitions in the app thread > ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796]) > This could lead to race conditions like we've seen when subscription state > changes in the app thread (over a set of assigned partitions), that could > have been modified in the background thread, leading to errors on "No current > assignment for partition " > [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378] > > Also, positions update is moved the background with KAFKA-17066 for the same > reason, so even if the assignment does not change, we could have a race > between the background setting positions to the committed offsets for > instance, and the app thread setting them manually via seek. > To avoid all of the above, we should have seek generate an event, send it to > the background, and then update the subscription state when processing that > event (similar to other api calls, ex, assign with KAFKA-17064) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR Handle test re-runs in junit.py [kafka]
chia7712 commented on code in PR #17034: URL: https://github.com/apache/kafka/pull/17034#discussion_r1738517170 ## .github/scripts/junit.py: ## @@ -148,29 +168,73 @@ def pretty_time_duration(seconds: float) -> str: total_failures += suite.failures total_errors += suite.errors total_time += suite.time -for test_failure in suite.test_failures: + +# Due to how the Develocity Test Retry plugin interacts with our geneated ClusterTests, we can see +# tests pass and then fail in the same run. Because of this, we need to capture all passed and all +# failed for each suite. Then we can find flakes by taking the intersection of those two. +all_suite_passed = {test.key() for test in suite.passed_tests} +all_suite_failed = {test.key() for test in suite.failed_tests} +flaky = all_suite_passed & all_suite_failed +total_flaky += len(flaky) + +# Display failures first +for test_failure in suite.failed_tests: +if test_failure.key() in flaky: +continue logger.debug(f"Found test failure: {test_failure}") simple_class_name = test_failure.class_name.split(".")[-1] -table.append(("❌", simple_class_name, test_failure.test_name, test_failure.failure_message, f"{test_failure.time:0.2f}s")) +failed_table.append((simple_class_name, test_failure.test_name, test_failure.failure_message, f"{test_failure.time:0.2f}s")) +for test_failure in suite.failed_tests: +if test_failure.key() not in flaky: +continue +logger.debug(f"Found flaky test: {test_failure}") +simple_class_name = test_failure.class_name.split(".")[-1] +flaky_table.append((simple_class_name, test_failure.test_name, test_failure.failure_message, f"{test_failure.time:0.2f}s")) for skipped_test in suite.skipped_tests: simple_class_name = skipped_test.class_name.split(".")[-1] logger.debug(f"Found skipped test: {skipped_test}") -table.append(("⚠️", simple_class_name, skipped_test.test_name, "Skipped", "")) +skipped_table.append((simple_class_name, skipped_test.test_name)) duration = pretty_time_duration(total_time) +logger.info(f"Finished processing {len(reports)} reports") # Print summary report_url = get_env("REPORT_URL") report_md = f"Download [HTML report]({report_url})." -summary = f"{total_tests} tests run in {duration}, {total_failures} failed ❌, {total_skipped} skipped ⚠️, {total_errors} errors." -logger.debug(summary) +summary = f"{total_tests} tests run in {duration}, {total_failures} {FAILED}, {total_flaky} {FLAKY}, {total_skipped} {SKIPPED}, and {total_errors} errors." Review Comment: The tests shown by "Failed Tests" table exclude the "flaky". Does it confuse readers that the "number of failed" is NOT equal to the number of tests shown by "Failed Tests" table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]
dajac opened a new pull request, #17057: URL: https://github.com/apache/kafka/pull/17057 This patch changes the default configuration of `group.coordinator.rebalance.protocols` to `classic,consumer`. It also updates various tests that were specifying the new default value. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]
dajac commented on code in PR #17057: URL: https://github.com/apache/kafka/pull/17057#discussion_r1738523290 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -576,19 +576,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.") } if (protocols.contains(GroupType.CONSUMER)) { - if (processRoles.isEmpty) { Review Comment: I had to soften the validation here because ZK also has the new default value now. Keep in mind that ZK will be remove entirely in 4.0 so it won't be an issue for the next release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix broken output layout of kafka-consumer-groups.sh [kafka]
sasakitoa opened a new pull request, #17058: URL: https://github.com/apache/kafka/pull/17058 Output format of `kafka-consumer-groups.sh` with `--delete-offsets` or `--reset-offsets` has been broken since line separators are missing. This PR will fix the problem. -- Current: ``` $ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --topic topic1 --group TestGroup1 Request succeed for deleting offsets with topic topic1 group TestGroup1 TOPIC PARTITION STATUS topic1 0 Successful topic1 1 Successful topic1 2 Successful topic1 3 Successful topic1 4 Successful topic1 5 Successful topic1 6 Successful topic1 7 Successful topic1 8 Successful topic1 9 Successful ``` Expect: ``` $ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --topic topic1 --group TestGroup1 Request succeed for deleting offsets with topic topic1 group TestGroup1 TOPIC PARTITION STATUS topic1 0 Successful topic1 1 Successful topic1 2 Successful topic1 3 Successful topic1 4 Successful topic1 5 Successful topic1 6 Successful topic1 7 Successful topic1 8 Successful topic1 9 Successful ``` -- Current: ``` $ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --from-file /tmp/offsets.csv --group TestGroup1 --execute GROUP TOPIC PARTITION NEW-OFFSET TestGroup1 topic1 1 20 TestGroup1 topic1 0 10 TestGroup1 topic1 3 40 TestGroup1 topic1 2 30 TestGroup1 topic1 5 60 TestGroup1 topic1 4 50 ``` Expect: ``` $ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --from-file /tmp/offsets.csv --group TestGroup1 --execute GROUP TOPIC PARTITION NEW-OFFSET TestGroup1 topic1 1 20 TestGroup1 topic1 0 10 TestGroup1 topic1 3 40 TestGroup1 topic1 2 30 TestGroup1 topic1 5 60 TestGroup1 topic1 4 50 ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17433 Add a deflake Github action [kafka]
chia7712 commented on code in PR #17019: URL: https://github.com/apache/kafka/pull/17019#discussion_r1738533619 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -199,46 +239,53 @@ private List processClusterTests(ExtensionContext return ret; } -private List processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults) { -List ret = processClusterTestInternal(context, annot, defaults); +private List processClusterTestInternal( +ExtensionContext context, +ClusterTest clusterTest, +ClusterTestDefaults defaults +) { +Type[] types = clusterTest.types().length == 0 ? defaults.types() : clusterTest.types(); +Map serverProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(clusterTest.serverProperties())) +.filter(e -> e.id() == -1) +.collect(Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b)); -if (ret.isEmpty()) { -throw new IllegalStateException("processClusterTest method should provide at least one config"); -} +Map> perServerProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(clusterTest.serverProperties())) +.filter(e -> e.id() != -1) +.collect(Collectors.groupingBy(ClusterConfigProperty::id, Collectors.mapping(Function.identity(), +Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b; -return ret; -} -private List processClusterTestInternal(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults) { -Type[] types = annot.types().length == 0 ? defaults.types() : annot.types(); -Map serverProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties())) -.filter(e -> e.id() == -1) -.collect(Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b)); +Map features = Arrays.stream(clusterTest.features()) +.collect(Collectors.toMap(ClusterFeature::feature, ClusterFeature::version)); -Map> perServerProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties())) -.filter(e -> e.id() != -1) -.collect(Collectors.groupingBy(ClusterConfigProperty::id, Collectors.mapping(Function.identity(), -Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b; +ClusterConfig config = ClusterConfig.builder() +.setTypes(new HashSet<>(Arrays.asList(types))) +.setBrokers(clusterTest.brokers() == 0 ? defaults.brokers() : clusterTest.brokers()) +.setControllers(clusterTest.controllers() == 0 ? defaults.controllers() : clusterTest.controllers()) +.setDisksPerBroker(clusterTest.disksPerBroker() == 0 ? defaults.disksPerBroker() : clusterTest.disksPerBroker()) +.setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ? defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES) +.setListenerName(clusterTest.listener().trim().isEmpty() ? null : clusterTest.listener()) +.setServerProperties(serverProperties) +.setPerServerProperties(perServerProperties) +.setSecurityProtocol(clusterTest.securityProtocol()) +.setMetadataVersion(clusterTest.metadataVersion()) +.setTags(Arrays.asList(clusterTest.tags())) +.setFeatures(features) +.build(); + +return Arrays.stream(types) +.map(type -> type.invocationContexts(context.getRequiredTestMethod().getName(), config)) +.collect(Collectors.toList()); +} -Map features = Arrays.stream(annot.features()) -.collect(Collectors.toMap(ClusterFeature::feature, ClusterFeature::version)); +Stream repeatedClusterTests(int repeatCount, ClusterTest[] clusterTestAnnots) { Review Comment: this is unused now. ## .github/README.md: ## @@ -0,0 +1,40 @@ +# GitHub Actions + +## Overview + +The entry point for our build is the "CI" workflow which is define in ci.yml. +This is used for both PR and trunk builds. The jobs and steps of the workflow +are defined in build.yml. + +## Opting-in to GitHub Actions + +To opt-in to the new GitHub actions workflows, simply name your branch with a +prefix of "gh-". For example, `gh-KAFKA-17433-deflake` + +## Disabling Email Notifications + +By default, GitHub sends an email for each failed action run. To change this, +visit https://github.com/settings/notifications and find System -> Actions. +Here you can change your notification preferences. + +## Publishing Build Scans + +> This only works for com
Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]
chia7712 commented on code in PR #17057: URL: https://github.com/apache/kafka/pull/17057#discussion_r1738568876 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -160,15 +159,11 @@ default Admin createAdminClient() { } default Set supportedGroupProtocols() { -Map serverProperties = config().serverProperties(); -Set supportedGroupProtocols = new HashSet<>(); -supportedGroupProtocols.add(CLASSIC); - -if (serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) { -supportedGroupProtocols.add(CONSUMER); +if (isKRaftTest()) { Review Comment: The `CONSUMER` protocol can be disabled even though it is kraft mode, right? Maybe we can check the `KafkaAPIs` directly. for example: ```java default Set supportedGroupProtocols() { if (brokers().values().stream().allMatch(b -> b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) { return mkSet(CLASSIC, CONSUMER); } else { return Collections.singleton(CLASSIC); } } ``` ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -76,9 +76,9 @@ public class GroupCoordinatorConfig { public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols"; public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " + -"The " + Group.GroupType.CONSUMER + " rebalance protocol is in preview and therefore must not be used in production. " + "The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production."; -public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString()); +public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = +Arrays.asList(Group.GroupType.CLASSIC.toString(), Group.GroupType.CONSUMER.toString()); Review Comment: Please make it immutable. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -576,19 +576,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.") } if (protocols.contains(GroupType.CONSUMER)) { - if (processRoles.isEmpty) { -throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster.") - } - if (!isNewGroupCoordinatorEnabled) { -throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported by the new group coordinator.") + if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) { +warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.") Review Comment: Maybe `KafkaAPIs#isConsumerGroupProtocolEnabled` needs similar log when the `isConsumerRebalanceProtocolSupported` return false and `CONSUMER` protocol is enabled. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L3816 ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -191,45 +189,17 @@ public void testDefaults(ClusterInstance clusterInstance) { Assertions.assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion()); } -@ClusterTests({ -@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { -@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), -}), -@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { -@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), -@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), -}) -}) +@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}) public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); supportedGroupProtocols.add(CONSUMER); - Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols)); -Assertions.assertEquals(2, clusterInstance.supportedGroupProtocols().size()); +Assertions.assertEquals(supportedGroupProtocols, clusterInstance.supportedGroupProtocols()); } -@Cluster
Re: [PR] MINOR: add helper function for clusterInstance [kafka]
chia7712 commented on code in PR #16852: URL: https://github.com/apache/kafka/pull/16852#discussion_r1738595734 ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -236,6 +236,20 @@ public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) { Assertions.assertEquals(1, clusterInstance.supportedGroupProtocols().size()); } + + +@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 3) +public void testCreateTopic(ClusterInstance clusterInstance) throws Exception { +String topicName = "test"; +int partitions = 3; +short replicas = 3; +clusterInstance.createTopic(topicName, partitions, replicas); + +try (Admin admin = clusterInstance.createAdminClient()) { + Assertions.assertTrue(admin.listTopics().listings().get().stream().anyMatch(s -> s.name().equals(topicName))); Review Comment: Please check the partition and replica also -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]
dajac commented on code in PR #17057: URL: https://github.com/apache/kafka/pull/17057#discussion_r1738609128 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -160,15 +159,11 @@ default Admin createAdminClient() { } default Set supportedGroupProtocols() { -Map serverProperties = config().serverProperties(); -Set supportedGroupProtocols = new HashSet<>(); -supportedGroupProtocols.add(CLASSIC); - -if (serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) { -supportedGroupProtocols.add(CONSUMER); +if (isKRaftTest()) { Review Comment: Good idea. Let me try this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]
dajac commented on code in PR #17057: URL: https://github.com/apache/kafka/pull/17057#discussion_r1738610295 ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -191,45 +189,17 @@ public void testDefaults(ClusterInstance clusterInstance) { Assertions.assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion()); } -@ClusterTests({ -@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { -@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), -}), -@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { -@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), -@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), -}) -}) +@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}) public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); supportedGroupProtocols.add(CONSUMER); - Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols)); -Assertions.assertEquals(2, clusterInstance.supportedGroupProtocols().size()); +Assertions.assertEquals(supportedGroupProtocols, clusterInstance.supportedGroupProtocols()); } -@ClusterTests({ -@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { -@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), -}), -@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { -@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), -}), -@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { Review Comment: It did not work with my implementation but it would work with yours. Let me bring it back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]
dajac commented on code in PR #17057: URL: https://github.com/apache/kafka/pull/17057#discussion_r1738610851 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -79,34 +77,34 @@ private ConsumerGroupCommandTestUtils() { } static List generator() { -return Stream.concat(forConsumerGroupCoordinator().stream(), forClassicGroupCoordinator().stream()) -.collect(Collectors.toList()); +return Stream +.concat(forKRaftGroupCoordinator().stream(), forZkGroupCoordinator().stream()) +.collect(Collectors.toList()); } -static List forConsumerGroupCoordinator() { +static List forKRaftGroupCoordinator() { Map serverProperties = new HashMap<>(); serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); Review Comment: I was debating whether I should remove those in this PR. I can do it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]
dajac commented on code in PR #17057: URL: https://github.com/apache/kafka/pull/17057#discussion_r1738612665 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -576,19 +576,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.") } if (protocols.contains(GroupType.CONSUMER)) { - if (processRoles.isEmpty) { -throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster.") - } - if (!isNewGroupCoordinatorEnabled) { -throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported by the new group coordinator.") + if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) { +warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.") Review Comment: Hum... This is too much in my opinion because it would log on every calls from the consumers. Having a warning at the beginning seems enough for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17428) remote segments deleted in RLMCopyTask stays `COPY_SEGMENT_START` state
[ https://issues.apache.org/jira/browse/KAFKA-17428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-17428: Labels: kip-required (was: ) > remote segments deleted in RLMCopyTask stays `COPY_SEGMENT_START` state > --- > > Key: KAFKA-17428 > URL: https://issues.apache.org/jira/browse/KAFKA-17428 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > Labels: kip-required > > Currently, we will delete failed uploaded segment and Custom metadata size > exceeded segments in copyLogSegment in RLMCopyTask. But after deletion, these > segment states are still in COPY_SEGMENT_STARTED. That "might" cause > unexpected issues in the future. We'd better to move the state from > {{COPY_SEGMENT_STARTED}} -> {{DELETE_SEGMENT_STARTED}} -> > {{DELETE_SEGMENT_FINISHED}} > > updated: > I thought about this when I first had a look at it and one thing that > bothered me is that {{DELETE_SEGMENT_STARTED}} means to me that we're now in > a state where we attempt deletion. However if the remote store is down and we > fail to copy and delete we will leave that segment in > {{DELETE_SEGMENT_STARTED}} and not attempt to delete it till the segment > itself breaches retention.ms/bytes. > We can probably just make it clearer but that was my thought at the time. > So, maybe when in deletion loop, we can add {{DELETE_SEGMENT_STARTED}} > segments into deletion directly, but that also needs to consider the > retention size calculation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Kafka 12822 2 [kafka]
pegasas opened a new pull request, #17059: URL: https://github.com/apache/kafka/pull/17059 KAFKA-12829: Remove deprecated StreamsBuilder#addGlobalStore of old Processor API ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17449) Move Quota classes to server module
Mickael Maison created KAFKA-17449: -- Summary: Move Quota classes to server module Key: KAFKA-17449 URL: https://issues.apache.org/jira/browse/KAFKA-17449 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison Assignee: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka 12822 2 [kafka]
pegasas commented on PR #17059: URL: https://github.com/apache/kafka/pull/17059#issuecomment-2321259947  CI passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16379: Coordinator event queue, processing, flush, purgatory time histograms [kafka]
jeffkbkim commented on PR #16949: URL: https://github.com/apache/kafka/pull/16949#issuecomment-2321260689 The HdrHistogram wrapper implementation (HdrHistogram, KafkaMetricHistogram) was authored by @dimitarndimitrov -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-17450) Optimise the handler methods in ShareConsumeRequestManager.
[ https://issues.apache.org/jira/browse/KAFKA-17450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shivsundar R reassigned KAFKA-17450: Assignee: Shivsundar R > Optimise the handler methods in ShareConsumeRequestManager. > --- > > Key: KAFKA-17450 > URL: https://issues.apache.org/jira/browse/KAFKA-17450 > Project: Kafka > Issue Type: Sub-task >Reporter: Shivsundar R >Assignee: Shivsundar R >Priority: Major > > Currently there are 4 handler functions for ShareAcknowledge responses. > Instead using AcknowledgeRequestType, we could merge the code and have only 2 > handler functions, one for ShareAcknowledge success and one for > ShareAcknowledge failure. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17450) Optimise the handler methods in ShareConsumeRequestManager.
Shivsundar R created KAFKA-17450: Summary: Optimise the handler methods in ShareConsumeRequestManager. Key: KAFKA-17450 URL: https://issues.apache.org/jira/browse/KAFKA-17450 Project: Kafka Issue Type: Sub-task Reporter: Shivsundar R Currently there are 4 handler functions for ShareAcknowledge responses. Instead using AcknowledgeRequestType, we could merge the code and have only 2 handler functions, one for ShareAcknowledge success and one for ShareAcknowledge failure. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17448) New consumer seek should update positions in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878122#comment-17878122 ] PoAn Yang commented on KAFKA-17448: --- Hi [~lianetm], thank you. I can handle it. 👍 > New consumer seek should update positions in background thread > -- > > Key: KAFKA-17448 > URL: https://issues.apache.org/jira/browse/KAFKA-17448 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0, 3.8.0, 3.7.1 >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > In the new AsyncKafkaConsumer, a call to seek will update the positions in > subscription state for the assigned partitions in the app thread > ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796]) > This could lead to race conditions like we've seen when subscription state > changes in the app thread (over a set of assigned partitions), that could > have been modified in the background thread, leading to errors on "No current > assignment for partition " > [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378] > > Also, positions update is moved the background with KAFKA-17066 for the same > reason, so even if the assignment does not change, we could have a race > between the background setting positions to the committed offsets for > instance, and the app thread setting them manually via seek. > To avoid all of the above, we should have seek generate an event, send it to > the background, and then update the subscription state when processing that > event (similar to other api calls, ex, assign with KAFKA-17064) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17448) New consumer seek should update positions in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PoAn Yang reassigned KAFKA-17448: - Assignee: PoAn Yang > New consumer seek should update positions in background thread > -- > > Key: KAFKA-17448 > URL: https://issues.apache.org/jira/browse/KAFKA-17448 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0, 3.8.0, 3.7.1 >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > In the new AsyncKafkaConsumer, a call to seek will update the positions in > subscription state for the assigned partitions in the app thread > ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796]) > This could lead to race conditions like we've seen when subscription state > changes in the app thread (over a set of assigned partitions), that could > have been modified in the background thread, leading to errors on "No current > assignment for partition " > [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378] > > Also, positions update is moved the background with KAFKA-17066 for the same > reason, so even if the assignment does not change, we could have a race > between the background setting positions to the committed offsets for > instance, and the app thread setting them manually via seek. > To avoid all of the above, we should have seek generate an event, send it to > the background, and then update the subscription state when processing that > event (similar to other api calls, ex, assign with KAFKA-17064) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17448) New consumer seek should update positions in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878124#comment-17878124 ] Lianet Magrans commented on KAFKA-17448: Thanks! Let me know if you have questions or when you need help with reviews. > New consumer seek should update positions in background thread > -- > > Key: KAFKA-17448 > URL: https://issues.apache.org/jira/browse/KAFKA-17448 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0, 3.8.0, 3.7.1 >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > In the new AsyncKafkaConsumer, a call to seek will update the positions in > subscription state for the assigned partitions in the app thread > ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796]) > This could lead to race conditions like we've seen when subscription state > changes in the app thread (over a set of assigned partitions), that could > have been modified in the background thread, leading to errors on "No current > assignment for partition " > [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378] > > Also, positions update is moved the background with KAFKA-17066 for the same > reason, so even if the assignment does not change, we could have a race > between the background setting positions to the committed offsets for > instance, and the app thread setting them manually via seek. > To avoid all of the above, we should have seek generate an event, send it to > the background, and then update the subscription state when processing that > event (similar to other api calls, ex, assign with KAFKA-17064) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: add ReconfigurableQuorumIntegrationTest [kafka]
chia7712 commented on code in PR #16991: URL: https://github.com/apache/kafka/pull/16991#discussion_r1738650234 ## metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java: ## @@ -401,7 +409,7 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws Exception { Map directoryTypes = new HashMap<>(); for (String emptyLogDir : ensemble.emptyLogDirs()) { DirectoryType directoryType = DirectoryType.calculate(emptyLogDir, -metadataLogDirectory, +metadataLogDirectory.orElseGet(() -> ""), Review Comment: `orElse("")` is good enough :smile: ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -404,22 +423,44 @@ private void formatNode( boolean writeMetadataDirectory ) { try { -MetaPropertiesEnsemble.Copier copier = -new MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY); -for (Entry entry : ensemble.logDirProps().entrySet()) { -String logDir = entry.getKey(); -if (writeMetadataDirectory || (!ensemble.metadataLogDir().equals(Optional.of(logDir { -log.trace("Adding {} to the list of directories to format.", logDir); -copier.setLogDirProps(logDir, entry.getValue()); +Formatter formatter = new Formatter(); +formatter.setNodeId(ensemble.nodeId().getAsInt()); +formatter.setClusterId(ensemble.clusterId().get()); +if (writeMetadataDirectory) { +formatter.setDirectories(ensemble.logDirProps().keySet()); +} else { + formatter.setDirectories(ensemble.logDirProps().keySet().stream(). +filter(d -> !ensemble.metadataLogDir().get().equals(d)). +collect(Collectors.toSet())); +} +if (formatter.directories().isEmpty()) { +return; +} + formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion()); +formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, + nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME)); +formatter.setUnstableFeatureVersionsEnabled(true); +formatter.setIgnoreFormatted(false); +formatter.setControllerListenerName("CONTROLLER"); +if (writeMetadataDirectory) { + formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get()); +} else { +formatter.setMetadataLogDirectory(Optional.empty()); Review Comment: the default value is empty, so maybe we can remove it. ## metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java: ## @@ -71,10 +72,50 @@ public void testFromRecordsListWithoutMetadataVersion() { () -> BootstrapMetadata.fromRecords(emptyList(), "quux")).getMessage()); } +private static final ApiMessageAndVersion MV_10 = +new ApiMessageAndVersion(new FeatureLevelRecord(). +setName(FEATURE_NAME). +setFeatureLevel((short) 10), (short) 0); + +private static final ApiMessageAndVersion MV_11 = Review Comment: This is unused. Maybe we should add a UT for it? ```java assertEquals((short) 11, BootstrapMetadata. fromRecords(Arrays.asList(MV_10, MV_11), "src").featureLevel(FEATURE_NAME)); ``` ## core/src/main/java/kafka/server/ServerSocketFactory.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.nio.channels.ServerSocketChannel; + +public interface ServerSocketFactory { +ServerSocketChannel openServerSocket( +String listenerName, +InetSocketAddress socketAddress, +int listenBacklogSize, +int recvBufferSize +) thro
Re: [PR] MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default [kafka]
chia7712 commented on code in PR #17057: URL: https://github.com/apache/kafka/pull/17057#discussion_r1738713919 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -576,19 +576,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.") } if (protocols.contains(GroupType.CONSUMER)) { - if (processRoles.isEmpty) { -throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster.") - } - if (!isNewGroupCoordinatorEnabled) { -throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported by the new group coordinator.") + if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) { +warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.") Review Comment: > it would log on every calls from the consumers you are right :100: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17449: Move Quota classes to server module [kafka]
mimaison opened a new pull request, #17060: URL: https://github.com/apache/kafka/pull/17060 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17450: Reduced the handlers for handling ShareAcknowledgeResponse. [kafka]
ShivsundarR opened a new pull request, #17061: URL: https://github.com/apache/kafka/pull/17061 *What* Currently there are 4 handler functions present for handling ShareAcknowledge responses. ShareConsumeRequestManager had an interface and the respective handlers would implement it. Instead of having 4 different handlers for this, now using AcknowledgeRequestType, we could merge the code and have only 2 handler functions, one for ShareAcknowledge success and one for ShareAcknowledge failure, eliminating the need for the interface. This PR also fixes a bug - We were not using the time at which the response was received while handling the ShareAcknowledge response, we were using an outdated time. Now the bug is fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17347: Add missing client-metrics option to kafka-configs.sh [kafka]
AndrewJSchofield commented on PR #17046: URL: https://github.com/apache/kafka/pull/17046#issuecomment-2321352952 Only 3 unit test failures, unrelated to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17449) Move Quota classes to server-common module
[ https://issues.apache.org/jira/browse/KAFKA-17449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-17449: --- Summary: Move Quota classes to server-common module (was: Move Quota classes to server module) > Move Quota classes to server-common module > -- > > Key: KAFKA-17449 > URL: https://issues.apache.org/jira/browse/KAFKA-17449 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17449) Move Quota classes to server-common module
[ https://issues.apache.org/jira/browse/KAFKA-17449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-17449: --- Description: RLMQuotaManager which will utlimately move to storage depends on the QuotaType and QuotaUtils classes, so it makes sense to put them in server-common instead of server. > Move Quota classes to server-common module > -- > > Key: KAFKA-17449 > URL: https://issues.apache.org/jira/browse/KAFKA-17449 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > > RLMQuotaManager which will utlimately move to storage depends on the > QuotaType and QuotaUtils classes, so it makes sense to put them in > server-common instead of server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17386: Remove broker-list, threads, num-fetch-threads in ConsumerPerformance [kafka]
chia7712 merged PR #16983: URL: https://github.com/apache/kafka/pull/16983 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17386) Remove broker-list, threads, num-fetch-threads in ConsumerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17386. Fix Version/s: 4.0.0 Resolution: Fixed > Remove broker-list, threads, num-fetch-threads in ConsumerPerformance > - > > Key: KAFKA-17386 > URL: https://issues.apache.org/jira/browse/KAFKA-17386 > Project: Kafka > Issue Type: Sub-task >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > Fix For: 4.0.0 > > > The > [broker-list|https://github.com/apache/kafka/blob/944c1353a925858ea9bd9024a713cd7301f55133/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java#L268C45-L271], > > [threads|https://github.com/apache/kafka/blob/944c1353a925858ea9bd9024a713cd7301f55133/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java#L298-L302], > > [num-fetch-threads|https://github.com/apache/kafka/blob/944c1353a925858ea9bd9024a713cd7301f55133/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java#L303-L307] > are deprecated options in ConsumerPerformance. We can consider to remove > them in 4.0. > https://cwiki.apache.org/confluence/display/KAFKA/KIP-499+-+Unify+connection+name+flag+for+command+line+tool > https://issues.apache.org/jira/browse/KAFKA-10126 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17450: Reduced the handlers for handling ShareAcknowledgeResponse. [kafka]
AndrewJSchofield commented on code in PR #17061: URL: https://github.com/apache/kafka/pull/17061#discussion_r1738762496 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -613,74 +607,100 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget, ShareAcknowledgeRequestData requestData, AcknowledgeRequestState acknowledgeRequestState, ClientResponse resp, - long currentTimeMs) { + long responseCompletionTimeMs) { try { +//acknowledgeRequestState.handleShareAcknowledgeSuccess(resp); Review Comment: This comment seems spurious. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16863 : Deprecate default exception handlers [kafka]
muralibasani commented on PR #17005: URL: https://github.com/apache/kafka/pull/17005#issuecomment-2321425276 > Committed the fix. > > LGTM now, assuming build passes... Great, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16863 : Deprecate default exception handlers [kafka]
muralibasani commented on PR #17005: URL: https://github.com/apache/kafka/pull/17005#issuecomment-2321453957 there are failing tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17137[part-5]: Ensure Admin APIs are properly tested [kafka]
m1a2st commented on code in PR #16905: URL: https://github.com/apache/kafka/pull/16905#discussion_r1738862139 ## core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala: ## @@ -337,6 +338,53 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException]) } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = { +client = createAdminClient +val timeout = Long.MaxValue + +val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(options).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateDelegationTokenWithNegativeTimeout(quorum: String): Unit = { +client = createAdminClient +val timeout = -1 + +val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(options).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = { +client = createAdminClient +val timeout = -1 + +val createOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(createOptions).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) Review Comment: ditto ## core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala: ## @@ -337,6 +338,53 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException]) } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = { +client = createAdminClient +val timeout = Long.MaxValue + +val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(options).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateDelegationTokenWithNegativeTimeout(quorum: String): Unit = { +client = createAdminClient +val timeout = -1 + +val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(options).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) Review Comment: ditto ## core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala: ## @@ -337,6 +338,53 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException]) } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = { +client = createAdminClient +val timeout = Long.MaxValue + +val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(options).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) Review Comment: nit: the `token.tokenInfo` have been used four times, I consider make it a variable is more better, WDTY ``` val tokenInfo = client.createDelegationToken(options).delegationToken(
Re: [PR] KAFKA-17449: Move Quota classes to server-common module [kafka]
chia7712 commented on code in PR #17060: URL: https://github.com/apache/kafka/pull/17060#discussion_r1738763455 ## server-common/src/main/java/org/apache/kafka/server/quota/QuotaUtils.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.quota; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.stats.Rate; + +/** + * Helper functions related to quotas + */ +public class QuotaUtils { + +/** + * This calculates the amount of time needed to bring the observed rate within quota + * assuming that no new metrics are recorded. + * + * If O is the observed rate and T is the target rate over a window of W, to bring O down to T, + * we need to add a delay of X to W such that O * W / (W + X) = T. + * Solving for X, we get X = (O - T)/T * W. + * + * @param timeMs current time in milliseconds + * @return Delay in milliseconds + */ +public static long throttleTime(QuotaViolationException e, long timeMs) { +double difference = e.value() - e.bound(); +// Use the precise window used by the rate calculation +double throttleTimeMs = difference / e.bound() * windowSize(e.metric(), timeMs); +return Math.round(throttleTimeMs); +} + +/** + * Calculates the amount of time needed to bring the observed rate within quota using the same algorithm as + * throttleTime() utility method but the returned value is capped to given maxThrottleTime + */ +public static long boundedThrottleTime(QuotaViolationException e, long maxThrottleTime, long timeMs) { +return Math.min(throttleTime(e, timeMs), maxThrottleTime); +} + +/** + * Returns window size of the given metric + * + * @param metric metric with measurable of type Rate + * @param timeMs current time in milliseconds + * @throws IllegalArgumentException if given measurable is not Rate + */ +private static long windowSize(KafkaMetric metric, long timeMs) { +return measurableAsRate(metric.metricName(), metric.measurable()).windowSize(metric.config(), timeMs); +} + +/** + * Casts provided Measurable to Rate + * @throws IllegalArgumentException if given measurable is not Rate + */ +private static Rate measurableAsRate(MetricName name, Measurable measurable) { +if (measurable instanceof Rate) { +return (Rate) measurable; +} else { +throw new IllegalArgumentException("Metric $name is not a Rate metric, value " + measurable); Review Comment: `$name` does not work in java code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add concurrent test for consumer.poll [kafka]
chia7712 commented on code in PR #17047: URL: https://github.com/apache/kafka/pull/17047#discussion_r1738898118 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -3382,7 +3382,43 @@ public void testCommittedThrowsTimeoutExceptionForNoResponse(GroupProtocol group assertEquals("Timeout of 1000ms expired before the last committed offset for partitions [test-0] could be determined. " + "Try tuning default.api.timeout.ms larger to relax the threshold.", timeoutException.getMessage()); } - + +@ParameterizedTest +@EnumSource(value = GroupProtocol.class) +public void testPreventMultiThread(GroupProtocol groupProtocol) throws InterruptedException { Review Comment: Could you please consider writing it by UT. https://github.com/apache/kafka/blob/4a3ab89f95aba294bb536af55548522d946d1ee3/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2428 is a good example -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17374: add bootstrap.controller to kafka-reassign-partitions.sh [kafka]
chia7712 commented on PR #16964: URL: https://github.com/apache/kafka/pull/16964#issuecomment-2321570487 @m1a2st Could you please rebase code to include #16644? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17449: Move Quota classes to server-common module [kafka]
mimaison commented on code in PR #17060: URL: https://github.com/apache/kafka/pull/17060#discussion_r1738925546 ## server-common/src/main/java/org/apache/kafka/server/quota/QuotaUtils.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.quota; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.stats.Rate; + +/** + * Helper functions related to quotas + */ +public class QuotaUtils { + +/** + * This calculates the amount of time needed to bring the observed rate within quota + * assuming that no new metrics are recorded. + * + * If O is the observed rate and T is the target rate over a window of W, to bring O down to T, + * we need to add a delay of X to W such that O * W / (W + X) = T. + * Solving for X, we get X = (O - T)/T * W. + * + * @param timeMs current time in milliseconds + * @return Delay in milliseconds + */ +public static long throttleTime(QuotaViolationException e, long timeMs) { +double difference = e.value() - e.bound(); +// Use the precise window used by the rate calculation +double throttleTimeMs = difference / e.bound() * windowSize(e.metric(), timeMs); +return Math.round(throttleTimeMs); +} + +/** + * Calculates the amount of time needed to bring the observed rate within quota using the same algorithm as + * throttleTime() utility method but the returned value is capped to given maxThrottleTime + */ +public static long boundedThrottleTime(QuotaViolationException e, long maxThrottleTime, long timeMs) { +return Math.min(throttleTime(e, timeMs), maxThrottleTime); +} + +/** + * Returns window size of the given metric + * + * @param metric metric with measurable of type Rate + * @param timeMs current time in milliseconds + * @throws IllegalArgumentException if given measurable is not Rate + */ +private static long windowSize(KafkaMetric metric, long timeMs) { +return measurableAsRate(metric.metricName(), metric.measurable()).windowSize(metric.config(), timeMs); +} + +/** + * Casts provided Measurable to Rate + * @throws IllegalArgumentException if given measurable is not Rate + */ +private static Rate measurableAsRate(MetricName name, Measurable measurable) { +if (measurable instanceof Rate) { +return (Rate) measurable; +} else { +throw new IllegalArgumentException("Metric $name is not a Rate metric, value " + measurable); Review Comment: Oops, I missed this one! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15909: Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer [kafka]
chia7712 merged PR #16933: URL: https://github.com/apache/kafka/pull/16933 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15909) Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-15909. Resolution: Fixed > Throw error when consumer configured with empty/whitespace-only group.id for > LegacyKafkaConsumer > > > Key: KAFKA-15909 > URL: https://issues.apache.org/jira/browse/KAFKA-15909 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: PoAn Yang >Priority: Major > Fix For: 4.0.0 > > > Per > [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer], > the use of an empty value for {{group.id}} configuration was deprecated back > in 2.2.0. > In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see > KAFKA-14438). > This task is to update the {{LegacyKafkaConsumer}} implementation to throw an > error in 4.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17428) remote segments deleted in RLMCopyTask stays DELETE_SEGMENT_STARTED state
[ https://issues.apache.org/jira/browse/KAFKA-17428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-17428: Summary: remote segments deleted in RLMCopyTask stays DELETE_SEGMENT_STARTED state (was: remote segments deleted in RLMCopyTask stays `COPY_SEGMENT_START` state) > remote segments deleted in RLMCopyTask stays DELETE_SEGMENT_STARTED state > - > > Key: KAFKA-17428 > URL: https://issues.apache.org/jira/browse/KAFKA-17428 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > Labels: kip-required > > Currently, we will delete failed uploaded segment and Custom metadata size > exceeded segments in copyLogSegment in RLMCopyTask. But after deletion, these > segment states are still in COPY_SEGMENT_STARTED. That "might" cause > unexpected issues in the future. We'd better to move the state from > {{COPY_SEGMENT_STARTED}} -> {{DELETE_SEGMENT_STARTED}} -> > {{DELETE_SEGMENT_FINISHED}} > > updated: > I thought about this when I first had a look at it and one thing that > bothered me is that {{DELETE_SEGMENT_STARTED}} means to me that we're now in > a state where we attempt deletion. However if the remote store is down and we > fail to copy and delete we will leave that segment in > {{DELETE_SEGMENT_STARTED}} and not attempt to delete it till the segment > itself breaches retention.ms/bytes. > We can probably just make it clearer but that was my thought at the time. > So, maybe when in deletion loop, we can add {{DELETE_SEGMENT_STARTED}} > segments into deletion directly, but that also needs to consider the > retention size calculation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17428) Remote segments stay in COPY_SEGMENT_STARTED state after RLMCopyTask fails to upload
[ https://issues.apache.org/jira/browse/KAFKA-17428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-17428: Summary: Remote segments stay in COPY_SEGMENT_STARTED state after RLMCopyTask fails to upload (was: Remote segments stay in DELETE_SEGMENT_STARTED state after RLMCopyTask fails to upload) > Remote segments stay in COPY_SEGMENT_STARTED state after RLMCopyTask fails to > upload > > > Key: KAFKA-17428 > URL: https://issues.apache.org/jira/browse/KAFKA-17428 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > Labels: kip-required > > Currently, we will delete failed uploaded segment and Custom metadata size > exceeded segments in copyLogSegment in RLMCopyTask. But after deletion, these > segment states are still in COPY_SEGMENT_STARTED. That "might" cause > unexpected issues in the future. We'd better to move the state from > {{COPY_SEGMENT_STARTED}} -> {{DELETE_SEGMENT_STARTED}} -> > {{DELETE_SEGMENT_FINISHED}} > > updated: > I thought about this when I first had a look at it and one thing that > bothered me is that {{DELETE_SEGMENT_STARTED}} means to me that we're now in > a state where we attempt deletion. However if the remote store is down and we > fail to copy and delete we will leave that segment in > {{DELETE_SEGMENT_STARTED}} and not attempt to delete it till the segment > itself breaches retention.ms/bytes. > We can probably just make it clearer but that was my thought at the time. > So, maybe when in deletion loop, we can add {{DELETE_SEGMENT_STARTED}} > segments into deletion directly, but that also needs to consider the > retention size calculation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17428) Remote segments stay in DELETE_SEGMENT_STARTED state after RLMCopyTask fails to upload
[ https://issues.apache.org/jira/browse/KAFKA-17428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-17428: Summary: Remote segments stay in DELETE_SEGMENT_STARTED state after RLMCopyTask fails to upload (was: remote segments deleted in RLMCopyTask stays DELETE_SEGMENT_STARTED state) > Remote segments stay in DELETE_SEGMENT_STARTED state after RLMCopyTask fails > to upload > -- > > Key: KAFKA-17428 > URL: https://issues.apache.org/jira/browse/KAFKA-17428 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > Labels: kip-required > > Currently, we will delete failed uploaded segment and Custom metadata size > exceeded segments in copyLogSegment in RLMCopyTask. But after deletion, these > segment states are still in COPY_SEGMENT_STARTED. That "might" cause > unexpected issues in the future. We'd better to move the state from > {{COPY_SEGMENT_STARTED}} -> {{DELETE_SEGMENT_STARTED}} -> > {{DELETE_SEGMENT_FINISHED}} > > updated: > I thought about this when I first had a look at it and one thing that > bothered me is that {{DELETE_SEGMENT_STARTED}} means to me that we're now in > a state where we attempt deletion. However if the remote store is down and we > fail to copy and delete we will leave that segment in > {{DELETE_SEGMENT_STARTED}} and not attempt to delete it till the segment > itself breaches retention.ms/bytes. > We can probably just make it clearer but that was my thought at the time. > So, maybe when in deletion loop, we can add {{DELETE_SEGMENT_STARTED}} > segments into deletion directly, but that also needs to consider the > retention size calculation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17451) Remove deprecated Consumer#committed
Chia-Ping Tsai created KAFKA-17451: -- Summary: Remove deprecated Consumer#committed Key: KAFKA-17451 URL: https://issues.apache.org/jira/browse/KAFKA-17451 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai The APIs were deprecated by KAFKA-8880 which is back in 2.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17449: Move Quota classes to server-common module [kafka]
chia7712 commented on PR #17060: URL: https://github.com/apache/kafka/pull/17060#issuecomment-2321633915 @mimaison Could you please fix the conflicts :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Back-port KAFKA-16230 to 3.7 branch [kafka]
lianetm commented on PR #16951: URL: https://github.com/apache/kafka/pull/16951#issuecomment-2321721217 Hey @kirktrue , took a first look and overall it looks good. Is there a run of the system tests with this change? (agree that failures in PlainTextConsumer are unrelated to this PR). Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17449: Move Quota classes to server-common module [kafka]
mimaison commented on PR #17060: URL: https://github.com/apache/kafka/pull/17060#issuecomment-2321731110 Yup, just rebased. Let's wait for the CI to run again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17451) Remove deprecated Consumer#committed
[ https://issues.apache.org/jira/browse/KAFKA-17451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878173#comment-17878173 ] Dmitry Werner commented on KAFKA-17451: --- [~chia7712] Hello, can I take it? > Remove deprecated Consumer#committed > > > Key: KAFKA-17451 > URL: https://issues.apache.org/jira/browse/KAFKA-17451 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > The APIs were deprecated by KAFKA-8880 which is back in 2.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17137: Feat admin client it acl configs [kafka]
chia7712 commented on code in PR #16648: URL: https://github.com/apache/kafka/pull/16648#discussion_r1739046762 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -96,6 +96,50 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { super.tearDown() } + @ParameterizedTest + @Timeout(30) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeConfigWithOptionTimeoutMs(quorum: String): Unit = { +val config = createConfig +config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") +val brokenClient = Admin.create(config) + +try { + // Describe and broker + val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, brokers(1).config.brokerId.toString) + val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, brokers(2).config.brokerId.toString) + val configResources = Seq(brokerResource1, brokerResource2) + + val exception = assertThrows(classOf[ExecutionException], () => { +brokenClient.describeConfigs(configResources.asJava,new DescribeConfigsOptions().timeoutMs(0)).all().get() + }) + assertInstanceOf(classOf[TimeoutException], exception.getCause) +} finally brokenClient.close(time.Duration.ZERO) + } + + @ParameterizedTest + @Timeout(30) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterConfigsWithOptionTimeoutMs(quorum: String): Unit = { +client = createAdminClient +val config = createConfig +config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") +val brokenClient = Admin.create(config) + +try { + val alterLogLevelsEntries = Seq( +new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL) + ).asJavaCollection + + val exception = assertThrows(classOf[ExecutionException], () => { +brokenClient.alterConfigs( +Map(brokerLoggerConfigResource -> new Config(alterLogLevelsEntries)).asJava, + new AlterConfigsOptions().timeoutMs(0)).all() Review Comment: please call `.get()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17451) Remove deprecated Consumer#committed
[ https://issues.apache.org/jira/browse/KAFKA-17451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878180#comment-17878180 ] Chia-Ping Tsai commented on KAFKA-17451: [~javakillah] oh, sorry that my team member is working on that already :) > Remove deprecated Consumer#committed > > > Key: KAFKA-17451 > URL: https://issues.apache.org/jira/browse/KAFKA-17451 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > The APIs were deprecated by KAFKA-8880 which is back in 2.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16863 : Deprecate default exception handlers [kafka]
mjsax commented on PR #17005: URL: https://github.com/apache/kafka/pull/17005#issuecomment-2321839005 Yeah. Looks related. Can you take a look and push a fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17452) Fix flaky QuorumControllerTest#testUncleanShutdownBroker
Chia-Ping Tsai created KAFKA-17452: -- Summary: Fix flaky QuorumControllerTest#testUncleanShutdownBroker Key: KAFKA-17452 URL: https://issues.apache.org/jira/browse/KAFKA-17452 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai {code:java} Error org.opentest4j.AssertionFailedError: PartitionRegistration(replicas=[3, 1, 2], directories=[AA, AA, AA], isr=[3], removingReplicas=[], addingReplicas=[], elr=[], lastKnownElr=[], leader=-1, leaderRecoveryState=RECOVERED, leaderEpoch=3, partitionEpoch=3) ==> expected: <1> but was: <0> Stacktrace org.opentest4j.AssertionFailedError: PartitionRegistration(replicas=[3, 1, 2], directories=[AA, AA, AA], isr=[3], removingReplicas=[], addingReplicas=[], elr=[], lastKnownElr=[], leader=-1, leaderRecoveryState=RECOVERED, leaderEpoch=3, partitionEpoch=3) ==> expected: <1> but was: <0> at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:563) at org.apache.kafka.controller.QuorumControllerTest.testUncleanShutdownBroker(QuorumControllerTest.java:427) at java.lang.reflect.Method.invoke(Method.java:498) at java.util.ArrayList.forEach(ArrayList.java:1259) at java.util.ArrayList.forEach(ArrayList.java:1259) Suppressed: org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0 at org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:752) at org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1268) at org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:545) at org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:180) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:878) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:868) at org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:153) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:142) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is larger than the current epoch 0 ... 11 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17453) Fix flaky PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest
Chia-Ping Tsai created KAFKA-17453: -- Summary: Fix flaky PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest Key: KAFKA-17453 URL: https://issues.apache.org/jira/browse/KAFKA-17453 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai {code:java} Errororg.opentest4j.AssertionFailedError: Failed to observe commit callback before timeoutStacktraceorg.opentest4j.AssertionFailedError: Failed to observe commit callback before timeout at kafka.api.AbstractConsumerTest.sendAndAwaitAsyncCommit(AbstractConsumerTest.scala:293) at kafka.api.AbstractConsumerTest.ensureNoRebalance(AbstractConsumerTest.scala:403) at kafka.api.BaseConsumerTest.testCoordinatorFailover(BaseConsumerTest.scala:110) at java.lang.reflect.Method.invoke(Method.java:498) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at java.util.ArrayList.forEach(ArrayList.java:1259) at java.util.ArrayList.forEach(ArrayList.java:1259) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17453) Fix flaky PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest
[ https://issues.apache.org/jira/browse/KAFKA-17453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-17453: --- Description: {code:java} Errorjava.util.NoSuchElementExceptionStacktracejava.util.NoSuchElementException at org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52) at kafka.api.PlaintextConsumerFetchTest.testFetchOutOfRangeOffsetResetConfigLatest(PlaintextConsumerFetchTest.scala:104) at java.lang.reflect.Method.invoke(Method.java:498) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at java.util.ArrayList.forEach(ArrayList.java:1259) at java.util.ArrayList.forEach(ArrayList.java:1259){code} was: {code:java} Errororg.opentest4j.AssertionFailedError: Failed to observe commit callback before timeoutStacktraceorg.opentest4j.AssertionFailedError: Failed to observe commit callback before timeout at kafka.api.AbstractConsumerTest.sendAndAwaitAsyncCommit(AbstractConsumerTest.scala:293) at kafka.api.AbstractConsumerTest.ensureNoRebalance(AbstractConsumerTest.scala:403) at kafka.api.BaseConsumerTest.testCoordinatorFailover(BaseConsumerTest.scala:110) at java.lang.reflect.Method.invoke(Method.java:498) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.uti
Re: [PR] KAFKA-17331: Throw unsupported version exception if the server does NOT support EarliestLocalSpec and LatestTieredSpec [kafka]
chia7712 commented on PR #16873: URL: https://github.com/apache/kafka/pull/16873#issuecomment-2321891055 The failed tests are flaky. the related jira are shown below. https://issues.apache.org/jira/browse/KAFKA-17265 https://issues.apache.org/jira/browse/KAFKA-16174 https://issues.apache.org/jira/browse/KAFKA-16993 https://issues.apache.org/jira/browse/KAFKA-17364 https://issues.apache.org/jira/browse/KAFKA-17452 https://issues.apache.org/jira/browse/KAFKA-16601 https://issues.apache.org/jira/browse/KAFKA-15103 https://issues.apache.org/jira/browse/KAFKA-7648 https://issues.apache.org/jira/browse/KAFKA-16024 https://issues.apache.org/jira/browse/KAFKA-17453 https://issues.apache.org/jira/browse/KAFKA-15146 https://issues.apache.org/jira/browse/KAFKA-17395 https://issues.apache.org/jira/browse/KAFKA-8115 https://issues.apache.org/jira/browse/KAFKA-15529 https://issues.apache.org/jira/browse/KAFKA-16634 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17331: Throw unsupported version exception if the server does NOT support EarliestLocalSpec and LatestTieredSpec [kafka]
chia7712 commented on PR #16873: URL: https://github.com/apache/kafka/pull/16873#issuecomment-2321894140 I will merge this PR to trunk and 3.9 tomorrow if no objection -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17331: Throw unsupported version exception if the server does NOT support EarliestLocalSpec and LatestTieredSpec [kafka]
junrao commented on code in PR #16873: URL: https://github.com/apache/kafka/pull/16873#discussion_r1739140459 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1104,35 +1104,43 @@ class KafkaApis(val requestChannel: RequestChannel, val responseTopics = authorizedRequestInfo.map { topic => val responsePartitions = topic.partitions.asScala.map { partition => -val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) - -try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( -topicPartition = topicPartition, -timestamp = partition.timestamp, -maxNumOffsets = partition.maxNumOffsets, -isFromConsumer = offsetRequest.replicaId == ListOffsetsRequest.CONSUMER_REPLICA_ID, -fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID) +if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) { Review Comment: Should we remove the code related to `EARLIEST_LOCAL_TIMESTAMP` in `UnifiedLog.legacyFetchOffsetsBefore`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]
lianetm commented on code in PR #16982: URL: https://github.com/apache/kafka/pull/16982#discussion_r1739141821 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -1053,12 +1053,20 @@ public void testFetchStableOffsetThrowInCommitted(GroupProtocol groupProtocol) { assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0))); } -// TODO: this test triggers a bug with the CONSUMER group protocol implementation. -// The bug will be investigated and fixed so this test can use both group protocols. @ParameterizedTest -@EnumSource(value = GroupProtocol.class, names = "CLASSIC") -public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) { -assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).poll(Duration.ZERO)); +@EnumSource(GroupProtocol.class) +public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) throws InterruptedException { +setupThrowableConsumer(groupProtocol); +TestUtils.waitForCondition(() -> { Review Comment: this change still has me thinking. This test is about a single call to `poll(ZERO)`, that is expected to throw an exception, but interesting fact is that the exception is generated when building the request [here](https://github.com/apache/kafka/blob/70dd577286de31ef20dc4f198e95f9b9e4479b47/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java#L146) (it does not require the actual send or response). So I wonder if the async consumer should somehow ensure that when poll returns (even with low time), it has allowed for at least one run of the background thread runOnce? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)
[ https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878191#comment-17878191 ] Lianet Magrans commented on KAFKA-16792: Thinking more about this, I wonder if we could make things slightly better and more consistent with the classic consumer, if we ensure that consumer.poll allows for at least one run of the background thread before returning? With that we would guarantee that poll(lowTimeout) generates the requests it needs (just like the classic consumer does), even though it may not have the time to wait for them. Note that this would instantly align the bevahiour in tests like testFetchStableOffsetThrowInPoll I guess, see my comment [there|https://github.com/apache/kafka/pull/16982/files#r1739141821] . Maybe a PollEvent that would do nothing in the background other than complete, but would serve as a signal to the foreground indicating that there're been 1 full cycle in the background. (just rough ideas for now, this requires more thinking, thoughts?) > Enable consumer unit tests that fail to fetch offsets only for new consumer > with poll(0) > > > Key: KAFKA-16792 > URL: https://issues.apache.org/jira/browse/KAFKA-16792 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Blocker > Labels: kip-848-client-support > Fix For: 4.0.0 > > > Enable the following unit tests for the new async consumer in > KafkaConsumerTest: > - testCurrentLag > - testFetchStableOffsetThrowInPoll > - testListOffsetShouldUpdateSubscriptions > - testPollReturnsRecords > - testResetToCommittedOffset > - testResetUsingAutoResetPolicy > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16758) Extend Consumer#close with option to leave the group or not
[ https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878220#comment-17878220 ] Lucas Brutschy commented on KAFKA-16758: Hey lianet, are you still planning to do this? > Extend Consumer#close with option to leave the group or not > --- > > Key: KAFKA-16758 > URL: https://issues.apache.org/jira/browse/KAFKA-16758 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Lianet Magrans >Priority: Major > Labels: needs-kip > Fix For: 4.0.0 > > > See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the > full context. > Essentially we would get rid of the "internal.leave.group.on.close" config > that is used as a backdoor by Kafka Streams right now to prevent closed > consumers from leaving the group, thus reducing unnecessary task movements > after a simple bounce. > This would be replaced by an actual public API that would allow the caller to > opt in or out to the LeaveGroup when close is called. This would be similar > to the KafkaStreams#close(CloseOptions) API, and in fact would be how that > API will be implemented (since it only works for static groups at the moment > as noted in KAFKA-16514 ) > This has several benefits over the current situation: > # It allows plain consumer apps to opt-out of leaving the group when closed, > which is currently not possible through any public API (only an internal > backdoor config) > # It enables the caller to dynamically select the appropriate action > depending on why the client is being closed – for example, you would not want > the consumer to leave the group during a simple restart, but would want it to > leave the group when shutting down the app or if scaling down the node. This > is not possible today, even with the internal config, since configs are > immutable > # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so > that the user's choice to leave the group during close will be respected for > non-static members -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16758) Extend Consumer#close with option to leave the group or not
[ https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878220#comment-17878220 ] Lucas Brutschy edited comment on KAFKA-16758 at 8/30/24 6:19 PM: - Hey [~lianetm], are you still planning to do this? was (Author: JIRAUSER302322): Hey lianet, are you still planning to do this? > Extend Consumer#close with option to leave the group or not > --- > > Key: KAFKA-16758 > URL: https://issues.apache.org/jira/browse/KAFKA-16758 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Lianet Magrans >Priority: Major > Labels: needs-kip > Fix For: 4.0.0 > > > See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the > full context. > Essentially we would get rid of the "internal.leave.group.on.close" config > that is used as a backdoor by Kafka Streams right now to prevent closed > consumers from leaving the group, thus reducing unnecessary task movements > after a simple bounce. > This would be replaced by an actual public API that would allow the caller to > opt in or out to the LeaveGroup when close is called. This would be similar > to the KafkaStreams#close(CloseOptions) API, and in fact would be how that > API will be implemented (since it only works for static groups at the moment > as noted in KAFKA-16514 ) > This has several benefits over the current situation: > # It allows plain consumer apps to opt-out of leaving the group when closed, > which is currently not possible through any public API (only an internal > backdoor config) > # It enables the caller to dynamically select the appropriate action > depending on why the client is being closed – for example, you would not want > the consumer to leave the group during a simple restart, but would want it to > leave the group when shutting down the app or if scaling down the node. This > is not possible today, even with the internal config, since configs are > immutable > # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so > that the user's choice to leave the group during close will be respected for > non-static members -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests which cannot be completed because of reaching record lock partition limit [kafka]
junrao commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1739168653 ## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.DelayedOperation; +import kafka.server.LogReadResult; +import kafka.server.QuotaFactory; +import kafka.server.ReplicaManager; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.storage.internals.log.FetchPartitionData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; +import scala.collection.Seq; +import scala.jdk.javaapi.CollectionConverters; +import scala.runtime.BoxedUnit; + +/** + * A delayed share fetch operation has been introduced in case there is no share partition for which we can acquire records. We will try to wait + * for MaxWaitMs for records to be released else complete the share fetch request. + */ +public class DelayedShareFetch extends DelayedOperation { +private final SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData; +private final ReplicaManager replicaManager; +private final Map partitionCacheMap; +private final Map topicPartitionDataFromTryComplete = new LinkedHashMap<>(); + +private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class); + +DelayedShareFetch( +SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData, +ReplicaManager replicaManager, +Map partitionCacheMap) { +super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty()); +this.shareFetchPartitionData = shareFetchPartitionData; +this.replicaManager = replicaManager; +this.partitionCacheMap = partitionCacheMap; +} + +@Override +public void onExpiration() { +} + +/** + * Complete the share fetch operation by fetching records for all partitions in the share fetch request irrespective + * of whether they have any acquired records. This is called when the fetch operation is forced to complete either Review Comment: > irrespective of whether they have any acquired records. This seems outdated? ## server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java: ## @@ -63,14 +63,19 @@ public class ShareGroupConfig { public static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT = 6; public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC = "The record acquisition lock maximum duration in milliseconds for share groups."; +public static final String SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG = "share.fetch.purgatory.purge.interval.requests"; Review Comment: This config is not in the KIP. So we need to update the KIP. ## core/src/main/java/kafka/server/share/SharePartitionManager.java: ## @@ -564,16 +592,29 @@ void maybeProcessFetchQueue() { ); } else { sharePartition.releaseFetchLock(); -log.info("Record lock partition limit exceeded for SharePartition with key {}, " + -"cannot acquire more records", sharePartitionKey); } } }); -if (topicPartitionData.isEmpty()) { -// No locks for share partitions could be acquired, so we complete the request and -// will re-fetch for the client in next poll. +if (shareFetchPartitionData.partitionMaxBytes.isEmpty()) { Review Comment: Should we move this check earlier? ## core/src/main/java/kafka/server/share/SharePartitionManager.java: ## @@ -564,16 +592,29 @@ void maybeProcessFetchQueue() {
[jira] [Created] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2
Chia-Ping Tsai created KAFKA-17454: -- Summary: Fix failed transactions_mixed_versions_test.py when running with 3.2 Key: KAFKA-17454 URL: https://issues.apache.org/jira/browse/KAFKA-17454 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai 3.2.3 release does not include KAFKA-14259, so it will produce exception (shown below) when log level is DEBUG. Hence, we should change the log level from DEBUG to INFO for 3.2.3 for example: add `self.kafka.log_level = "INFO"` {code:java} Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be cast to java.lang.Comparable at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47) at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355) at java.util.TimSort.sort(TimSort.java:220) at java.util.Arrays.sort(Arrays.java:1512) at java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212) at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886) at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763) at java.util.Formatter.format(Formatter.java:2520) at java.util.Formatter.format(Formatter.java:2455) at java.lang.String.format(String.java:2940) at java.util.Optional.toString(Optional.java:346) at java.lang.String.valueOf(String.java:2994) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.util.AbstractMap.toString(AbstractMap.java:559) at java.lang.String.valueOf(String.java:2994) at java.lang.StringBuilder.append(StringBuilder.java:136) at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132) at java.lang.String.valueOf(String.java:2994) at java.lang.StringBuilder.append(StringBuilder.java:136) at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353) at java.lang.String.valueOf(String.java:2994) at java.lang.StringBuilder.append(StringBuilder.java:136) at kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274) at kafka.utils.Logging.debug(Logging.scala:62) at kafka.utils.Logging.debug$(Logging.scala:62) at kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37) at kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274) at kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) at java.lang.Thread.run(Thread.java:750) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] ISSUE-1014 Should fail in the queue [kafka-merge-queue-sandbox]
mumrah closed pull request #21: ISSUE-1014 Should fail in the queue URL: https://github.com/apache/kafka-merge-queue-sandbox/pull/21 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update README.md [kafka-merge-queue-sandbox]
mumrah opened a new pull request, #43: URL: https://github.com/apache/kafka-merge-queue-sandbox/pull/43 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2
[ https://issues.apache.org/jira/browse/KAFKA-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-17454: --- Fix Version/s: 4.0.0 3.9.0 > Fix failed transactions_mixed_versions_test.py when running with 3.2 > > > Key: KAFKA-17454 > URL: https://issues.apache.org/jira/browse/KAFKA-17454 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 4.0.0, 3.9.0 > > > 3.2.3 release does not include KAFKA-14259, so it will produce exception > (shown below) when log level is DEBUG. Hence, we should change the log level > from DEBUG to INFO for 3.2.3 > for example: add `self.kafka.log_level = "INFO"` > {code:java} > Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be > cast to java.lang.Comparable > at > java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47) > at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355) > at java.util.TimSort.sort(TimSort.java:220) > at java.util.Arrays.sort(Arrays.java:1512) > at > java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212) > at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886) > at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763) > at java.util.Formatter.format(Formatter.java:2520) > at java.util.Formatter.format(Formatter.java:2455) > at java.lang.String.format(String.java:2940) > at java.util.Optional.toString(Optional.java:346) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at java.util.AbstractMap.toString(AbstractMap.java:559) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274) > at kafka.utils.Logging.debug(Logging.scala:62) > at kafka.utils.Logging.debug$(Logging.scala:62) > at > kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37) > at > kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274) > at > kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2
[ https://issues.apache.org/jira/browse/KAFKA-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878223#comment-17878223 ] Chia-Ping Tsai commented on KAFKA-17454: noted: the error happens only if the size of supported features is bigger than 1 which trigger the "sort". That means the error happens when the `kraft.version` gets in, since there is already a feature `metadata.version` > Fix failed transactions_mixed_versions_test.py when running with 3.2 > > > Key: KAFKA-17454 > URL: https://issues.apache.org/jira/browse/KAFKA-17454 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > 3.2.3 release does not include KAFKA-14259, so it will produce exception > (shown below) when log level is DEBUG. Hence, we should change the log level > from DEBUG to INFO for 3.2.3 > for example: add `self.kafka.log_level = "INFO"` > {code:java} > Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be > cast to java.lang.Comparable > at > java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47) > at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355) > at java.util.TimSort.sort(TimSort.java:220) > at java.util.Arrays.sort(Arrays.java:1512) > at > java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212) > at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886) > at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763) > at java.util.Formatter.format(Formatter.java:2520) > at java.util.Formatter.format(Formatter.java:2455) > at java.lang.String.format(String.java:2940) > at java.util.Optional.toString(Optional.java:346) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at java.util.AbstractMap.toString(AbstractMap.java:559) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274) > at kafka.utils.Logging.debug(Logging.scala:62) > at kafka.utils.Logging.debug$(Logging.scala:62) > at > kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37) > at > kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274) > at > kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2
[ https://issues.apache.org/jira/browse/KAFKA-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-17454: --- Priority: Blocker (was: Major) > Fix failed transactions_mixed_versions_test.py when running with 3.2 > > > Key: KAFKA-17454 > URL: https://issues.apache.org/jira/browse/KAFKA-17454 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 4.0.0, 3.9.0 > > > 3.2.3 release does not include KAFKA-14259, so it will produce exception > (shown below) when log level is DEBUG. Hence, we should change the log level > from DEBUG to INFO for 3.2.3 > for example: add `self.kafka.log_level = "INFO"` > {code:java} > Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be > cast to java.lang.Comparable > at > java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47) > at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355) > at java.util.TimSort.sort(TimSort.java:220) > at java.util.Arrays.sort(Arrays.java:1512) > at > java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212) > at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886) > at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763) > at java.util.Formatter.format(Formatter.java:2520) > at java.util.Formatter.format(Formatter.java:2455) > at java.lang.String.format(String.java:2940) > at java.util.Optional.toString(Optional.java:346) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at java.util.AbstractMap.toString(AbstractMap.java:559) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274) > at kafka.utils.Logging.debug(Logging.scala:62) > at kafka.utils.Logging.debug$(Logging.scala:62) > at > kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37) > at > kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274) > at > kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Update README.md [kafka-merge-queue-sandbox]
mumrah merged PR #43: URL: https://github.com/apache/kafka-merge-queue-sandbox/pull/43 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2
[ https://issues.apache.org/jira/browse/KAFKA-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-17454: --- Fix Version/s: (was: 3.9.0) > Fix failed transactions_mixed_versions_test.py when running with 3.2 > > > Key: KAFKA-17454 > URL: https://issues.apache.org/jira/browse/KAFKA-17454 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 4.0.0 > > > 3.2.3 release does not include KAFKA-14259, so it will produce exception > (shown below) when log level is DEBUG. Hence, we should change the log level > from DEBUG to INFO for 3.2.3 > for example: add `self.kafka.log_level = "INFO"` > {code:java} > Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be > cast to java.lang.Comparable > at > java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47) > at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355) > at java.util.TimSort.sort(TimSort.java:220) > at java.util.Arrays.sort(Arrays.java:1512) > at > java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212) > at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886) > at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763) > at java.util.Formatter.format(Formatter.java:2520) > at java.util.Formatter.format(Formatter.java:2455) > at java.lang.String.format(String.java:2940) > at java.util.Optional.toString(Optional.java:346) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at java.util.AbstractMap.toString(AbstractMap.java:559) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274) > at kafka.utils.Logging.debug(Logging.scala:62) > at kafka.utils.Logging.debug$(Logging.scala:62) > at > kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37) > at > kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274) > at > kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17454) Fix failed transactions_mixed_versions_test.py when running with 3.2
[ https://issues.apache.org/jira/browse/KAFKA-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-17454: --- Fix Version/s: 3.9.0 > Fix failed transactions_mixed_versions_test.py when running with 3.2 > > > Key: KAFKA-17454 > URL: https://issues.apache.org/jira/browse/KAFKA-17454 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 4.0.0, 3.9.0 > > > 3.2.3 release does not include KAFKA-14259, so it will produce exception > (shown below) when log level is DEBUG. Hence, we should change the log level > from DEBUG to INFO for 3.2.3 > for example: add `self.kafka.log_level = "INFO"` > {code:java} > Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be > cast to java.lang.Comparable > at > java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47) > at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355) > at java.util.TimSort.sort(TimSort.java:220) > at java.util.Arrays.sort(Arrays.java:1512) > at > java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:483) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.kafka.metadata.BrokerRegistration.toString(BrokerRegistration.java:212) > at java.util.Formatter$FormatSpecifier.printString(Formatter.java:2886) > at java.util.Formatter$FormatSpecifier.print(Formatter.java:2763) > at java.util.Formatter.format(Formatter.java:2520) > at java.util.Formatter.format(Formatter.java:2455) > at java.lang.String.format(String.java:2940) > at java.util.Optional.toString(Optional.java:346) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at java.util.AbstractMap.toString(AbstractMap.java:559) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at org.apache.kafka.image.ClusterDelta.toString(ClusterDelta.java:132) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at org.apache.kafka.image.MetadataDelta.toString(MetadataDelta.java:353) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > kafka.server.metadata.BrokerMetadataListener.$anonfun$publish$1(BrokerMetadataListener.scala:274) > at kafka.utils.Logging.debug(Logging.scala:62) > at kafka.utils.Logging.debug$(Logging.scala:62) > at > kafka.server.metadata.BrokerMetadataListener.debug(BrokerMetadataListener.scala:37) > at > kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:274) > at > kafka.server.metadata.BrokerMetadataListener$StartPublishingEvent.run(BrokerMetadataListener.scala:241) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR Handle test re-runs in junit.py [kafka]
mumrah commented on code in PR #17034: URL: https://github.com/apache/kafka/pull/17034#discussion_r1739299366 ## .github/scripts/junit.py: ## @@ -148,29 +168,73 @@ def pretty_time_duration(seconds: float) -> str: total_failures += suite.failures total_errors += suite.errors total_time += suite.time -for test_failure in suite.test_failures: + +# Due to how the Develocity Test Retry plugin interacts with our geneated ClusterTests, we can see +# tests pass and then fail in the same run. Because of this, we need to capture all passed and all +# failed for each suite. Then we can find flakes by taking the intersection of those two. +all_suite_passed = {test.key() for test in suite.passed_tests} +all_suite_failed = {test.key() for test in suite.failed_tests} +flaky = all_suite_passed & all_suite_failed +total_flaky += len(flaky) + +# Display failures first +for test_failure in suite.failed_tests: +if test_failure.key() in flaky: +continue logger.debug(f"Found test failure: {test_failure}") simple_class_name = test_failure.class_name.split(".")[-1] -table.append(("❌", simple_class_name, test_failure.test_name, test_failure.failure_message, f"{test_failure.time:0.2f}s")) +failed_table.append((simple_class_name, test_failure.test_name, test_failure.failure_message, f"{test_failure.time:0.2f}s")) +for test_failure in suite.failed_tests: +if test_failure.key() not in flaky: +continue +logger.debug(f"Found flaky test: {test_failure}") +simple_class_name = test_failure.class_name.split(".")[-1] +flaky_table.append((simple_class_name, test_failure.test_name, test_failure.failure_message, f"{test_failure.time:0.2f}s")) for skipped_test in suite.skipped_tests: simple_class_name = skipped_test.class_name.split(".")[-1] logger.debug(f"Found skipped test: {skipped_test}") -table.append(("⚠️", simple_class_name, skipped_test.test_name, "Skipped", "")) +skipped_table.append((simple_class_name, skipped_test.test_name)) duration = pretty_time_duration(total_time) +logger.info(f"Finished processing {len(reports)} reports") # Print summary report_url = get_env("REPORT_URL") report_md = f"Download [HTML report]({report_url})." -summary = f"{total_tests} tests run in {duration}, {total_failures} failed ❌, {total_skipped} skipped ⚠️, {total_errors} errors." -logger.debug(summary) +summary = f"{total_tests} tests run in {duration}, {total_failures} {FAILED}, {total_flaky} {FLAKY}, {total_skipped} {SKIPPED}, and {total_errors} errors." Review Comment: Thanks, I noticed this too. Latest commit should fix this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Testing 2 [kafka]
mumrah opened a new pull request, #17062: URL: https://github.com/apache/kafka/pull/17062 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17442: Handled persister errors with write async calls (KIP-932) [kafka]
junrao commented on code in PR #16956: URL: https://github.com/apache/kafka/pull/16956#discussion_r1739379496 ## core/src/test/java/kafka/server/share/SharePartitionTest.java: ## @@ -4191,11 +4337,11 @@ public void testAcknowledgeSubsetWithAnotherMember() { Collections.singletonList(new ShareAcknowledgementBatch(5, 7, Collections.singletonList((byte) 1; // Acknowledge subset with another member. -CompletableFuture> ackResult = sharePartition.acknowledge("member-2", +CompletableFuture ackResult = sharePartition.acknowledge("member-2", Collections.singletonList(new ShareAcknowledgementBatch(9, 11, Collections.singletonList((byte) 1; -assertFalse(ackResult.isCompletedExceptionally()); -assertTrue(ackResult.join().isPresent()); -assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass()); +//assertNull(ackResult.join()); Review Comment: Should we remove this code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests which cannot be completed because of reaching record lock partition limit [kafka]
junrao commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1739387577 ## core/src/main/java/kafka/server/share/SharePartitionManager.java: ## @@ -498,10 +519,17 @@ List cachedTopicIdPartitionsInShareSession(String groupId, Uui return cachedTopicIdPartitions; } +// Add the share fetch request to the delayed share fetch purgatory to process the fetch request if it can be +// completed else watch until it can be completed/timeout. +private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set keys) { + this.delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, CollectionConverters.asScala(keys).toSeq()); Review Comment: We want to be consistent with using the instance val. In other places, we don't use `this`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 8850: Updated documentation to clarify fetch.min.bytes behaviour. [kafka]
mjsax commented on PR #16749: URL: https://github.com/apache/kafka/pull/16749#issuecomment-2322298335 \cc @lianetm for review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org