Re: [PR] KAFKA-18001: Support UpdateRaftVoterRequest in KafkaNetworkChannel (WIP) [kafka]
justinrlee commented on PR #17773: URL: https://github.com/apache/kafka/pull/17773#issuecomment-2472928978 Thanks, @ahuang98! I have updated the PR title and will look at adding tests today or tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18004: Use 3.8 to run zk service for e2e [kafka]
chia7712 commented on code in PR #17790: URL: https://github.com/apache/kafka/pull/17790#discussion_r1839513727 ## tests/kafkatest/services/zookeeper.py: ## @@ -44,7 +44,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service): } def __init__(self, context, num_nodes, zk_sasl = False, zk_client_port = True, zk_client_secure_port = False, - zk_tls_encrypt_only = False, version=DEV_BRANCH): Review Comment: We need to replace all dev by 3.8 for this file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17232: MirrorCheckpointConnector does not generate task configs if initial consumer group load times out. [kafka]
Gakhramanzode commented on PR #16767: URL: https://github.com/apache/kafka/pull/16767#issuecomment-2472664603 @frankvicky @Hongten @C0urante Hello, We've encountered an issue that seems to be related to the changes in this pull request. **Context:** - **Kafka Version:** Upgraded from 3.6.0 to 3.9.0 - **Setup:** Using MirrorMaker 2 to mirror topics between clusters - **Issue:** After upgrading, we received repeated errors from `MirrorCheckpointConnector` stating: ```bash Failed to reconfigure connector’s tasks (MirrorCheckpointConnector), retrying after backoff. org.apache.kafka.connect.errors.RetriableException: Timeout while loading consumer groups. ``` **Attempted Solutions:** - Increased `admin.timeout.ms` and other timeout settings - Verified ACLs and authentication - None of these steps resolved the issue **Resolution:** **Downgraded to Kafka 3.8.1:** The error disappeared after downgrading, suggesting the issue was introduced in 3.9.0. **Observations:** - Our clusters are relatively small (about 20 consumer groups), so the timeout during the initial consumer group load seems unlikely under normal circumstances. - It appears that the changes made in this pull request might have inadvertently caused this behavior. **Questions:** 1. Is there a recommended workaround or configuration adjustment to prevent this error in Kafka 3.9.0? 2. Are there plans to address this issue in an upcoming release? We appreciate any guidance you can provide. Thank you for your efforts in maintaining and improving Kafka. Best regards -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-18004) Use version 3.8 to run the ZooKeeper service for end-to-end tests
[ https://issues.apache.org/jira/browse/KAFKA-18004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-18004. Fix Version/s: 4.0.0 Resolution: Fixed > Use version 3.8 to run the ZooKeeper service for end-to-end tests > - > > Key: KAFKA-18004 > URL: https://issues.apache.org/jira/browse/KAFKA-18004 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Major > Fix For: 4.0.0 > > > We plan to remove all ZooKeeper-related code in version 4.0. However, some > old brokers in the end-to-end tests still require ZooKeeper service, so we > need to run the ZooKeeper service using the 3.x release instead of the dev > branch. > Since version 3.9 is not available in the > https://s3-us-west-2.amazonaws.com/kafka-packages repo, we can use version > 3.8 for now. > https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/zookeeper.py#L47 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18004) Use version 3.8 to run the ZooKeeper service for end-to-end tests
[ https://issues.apache.org/jira/browse/KAFKA-18004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-18004: --- Summary: Use version 3.8 to run the ZooKeeper service for end-to-end tests (was: Use 3.8 to run zk service for e2e) > Use version 3.8 to run the ZooKeeper service for end-to-end tests > - > > Key: KAFKA-18004 > URL: https://issues.apache.org/jira/browse/KAFKA-18004 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Major > > We plan to remove all ZooKeeper-related code in version 4.0. However, some > old brokers in the end-to-end tests still require ZooKeeper service, so we > need to run the ZooKeeper service using the 3.x release instead of the dev > branch. > Since version 3.9 is not available in the > https://s3-us-west-2.amazonaws.com/kafka-packages repo, we can use version > 3.8 for now. > https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/zookeeper.py#L47 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [DO NOT MERGE] PoC for ProcessingExceptionHandler [kafka]
cadonna closed pull request #16137: [DO NOT MERGE] PoC for ProcessingExceptionHandler URL: https://github.com/apache/kafka/pull/16137 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17995: Large value for retention.ms could prevent remote data cleanup in Tiered Storage [kafka]
divijvaidya commented on code in PR #17794: URL: https://github.com/apache/kafka/pull/17794#discussion_r1840213747 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2765,6 +2765,34 @@ public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws Remot verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1)); } +@Test +public void testDeleteRetentionMsBiggerThanTimeMs() throws RemoteStorageException, ExecutionException, InterruptedException { +LogConfig mockLogConfig = new LogConfig(Map.of("retention.ms", time.milliseconds() + 100)); Review Comment: If, let's say, test is running on a slow machine and by the time we reach to build `RetentionTimeData`, 100ms has already elapsed since the start of the test, in those cases, our cleanupUntilMs will actually become positive, leading to a flaky test. Could we set this value to a large value such as 1 month after current epoch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17987) Remove assorted ZK-related files
[ https://issues.apache.org/jira/browse/KAFKA-17987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17987. Fix Version/s: 4.0.0 Resolution: Fixed > Remove assorted ZK-related files > > > Key: KAFKA-17987 > URL: https://issues.apache.org/jira/browse/KAFKA-17987 > Project: Kafka > Issue Type: Sub-task >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17885: Enable clients to rebootstrap based on timeout or error code (KIP-1102) [kafka]
rajinisivaram commented on code in PR #17720: URL: https://github.com/apache/kafka/pull/17720#discussion_r1840224877 ## core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala: ## @@ -87,3 +128,17 @@ class ConsumerRebootstrapTest extends RebootstrapTest { producer.close() } } + +object ConsumerRebootstrapTest { + + final val RebootstrapTestName = s"${TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames}.useRebootstrapTriggerMs={2}" + def rebootstrapTestParams: stream.Stream[Arguments] = { +assertEquals(1, getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit.count()) Review Comment: @omkreddy Thanks for the review. The code below is using only the first entry. At the moment, there is only one entry, but I wanted to make sure that if we added more in future, the test would get updated to include all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Fix a few test names [kafka]
chia7712 commented on code in PR #17788: URL: https://github.com/apache/kafka/pull/17788#discussion_r1840221645 ## streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextStateStoreTest.java: ## @@ -155,7 +155,7 @@ public static Stream parameters() { return values.stream(); } -@ParameterizedTest(name = "builder = {0}, timestamped = {1}, caching = {2}, logging = {3}") +@ParameterizedTest(name = "shouldEitherInitOrThrow(builder = {0}, timestamped = {1}, caching = {2}, logging = {3})") Review Comment: Could you please consider using `{displayName}` instead? This would help ensure naming consistency. Additionally, we should address the following naming issues as well. https://github.com/apache/kafka/blob/test-catalog/test-catalog/clients/tests.yaml#L817C3-L817C13 https://github.com/apache/kafka/blob/test-catalog/test-catalog/clients/tests.yaml#L3617 https://github.com/apache/kafka/blob/test-catalog/test-catalog/storage/tests.yaml#L30 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Integrate Streams membership manager with stream thread [kafka]
cadonna opened a new pull request, #17795: URL: https://github.com/apache/kafka/pull/17795 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17885: Enable clients to rebootstrap based on timeout or error code (KIP-1102) [kafka]
rajinisivaram merged PR #17720: URL: https://github.com/apache/kafka/pull/17720 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17885: Enable clients to rebootstrap based on timeout or error code (KIP-1102) [kafka]
rajinisivaram commented on PR #17720: URL: https://github.com/apache/kafka/pull/17720#issuecomment-2473551972 @apoorvmittal10 @omkreddy Thanks for the reviews. @apoorvmittal10 If you have follow-on comments, I can address those in a separate PR. Merging this to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17850: fix leaking internal exception in state manager [kafka]
cadonna commented on PR #17711: URL: https://github.com/apache/kafka/pull/17711#issuecomment-2472783813 Could you please add a description to the PR so that it is clear what it changes. Linking to the JIRA ticket is actually not really needed since the JIRA ticket number can be found in the title. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17885) Enable clients to rebootstrap based on timeout or error code
[ https://issues.apache.org/jira/browse/KAFKA-17885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-17885. Fix Version/s: 4.0.0 Reviewer: Manikumar Resolution: Fixed > Enable clients to rebootstrap based on timeout or error code > > > Key: KAFKA-17885 > URL: https://issues.apache.org/jira/browse/KAFKA-17885 > Project: Kafka > Issue Type: New Feature > Components: clients >Affects Versions: 3.9.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 4.0.0 > > > See > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1102%3A+Enable+clients+to+rebootstrap+based+on+timeout+or+error+code] > for details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17829) Verify ShareFetch requests return a completed/erroneous future on purgatory close
[ https://issues.apache.org/jira/browse/KAFKA-17829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897916#comment-17897916 ] Abhinav Dixit commented on KAFKA-17829: --- We also ran tests in multi-broker setup such that share fetch requests are in pending state within the delayed share fetch requests purgatory and then we close the broker. We see regular consumption for the share consumers on the other brokers. Tests were also run along the lines of leader change and topic partition deletion. In all such scenarios, we confirm that even if share fetch requests are pending on the purgatory and we close the broker, the consumption happens smoothly on the other brokers. > Verify ShareFetch requests return a completed/erroneous future on purgatory > close > - > > Key: KAFKA-17829 > URL: https://issues.apache.org/jira/browse/KAFKA-17829 > Project: Kafka > Issue Type: Sub-task >Reporter: Abhinav Dixit >Assignee: Abhinav Dixit >Priority: Major > > We need to verify that on shutdown of the delayed share fetch purgatory, the > share fetch requests which are present inside the purgatory return with an > erroneous future or a completed future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-17829) Verify ShareFetch requests return a completed/erroneous future on purgatory close
[ https://issues.apache.org/jira/browse/KAFKA-17829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhinav Dixit resolved KAFKA-17829. --- Resolution: Not A Bug > Verify ShareFetch requests return a completed/erroneous future on purgatory > close > - > > Key: KAFKA-17829 > URL: https://issues.apache.org/jira/browse/KAFKA-17829 > Project: Kafka > Issue Type: Sub-task >Reporter: Abhinav Dixit >Assignee: Abhinav Dixit >Priority: Major > > We need to verify that on shutdown of the delayed share fetch purgatory, the > share fetch requests which are present inside the purgatory return with an > erroneous future or a completed future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17957) Console share consumer unable to consume all messages when using multiple brokers
[ https://issues.apache.org/jira/browse/KAFKA-17957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897917#comment-17897917 ] Abhinav Dixit commented on KAFKA-17957: --- The way I had setup my local multi broker kafka environment was incorrect. Hence, this is not an issue. > Console share consumer unable to consume all messages when using multiple > brokers > - > > Key: KAFKA-17957 > URL: https://issues.apache.org/jira/browse/KAFKA-17957 > Project: Kafka > Issue Type: Sub-task >Reporter: Abhinav Dixit >Priority: Major > > I tried a console share consumer using multiple broker kafka cluster. After a > while, the consumption seemed to fail. It looks like there could be > disconnects happening which is causing issues in messages consumption. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-18005) Return metadata of sensitive config when describe config
Luke Chen created KAFKA-18005: - Summary: Return metadata of sensitive config when describe config Key: KAFKA-18005 URL: https://issues.apache.org/jira/browse/KAFKA-18005 Project: Kafka Issue Type: Improvement Reporter: Luke Chen Assignee: Luke Chen Currently, when describing config for a resource, we'll get `null` if the config is a sensitive config, ex: "ssl.keystore.certificate.chain", "ssl.keystore.password". And when describing configs with them it'll always return something like this: {code:java} > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers > --entity-name 2 --describe Dynamic configs for broker 2 are: listener.name.plantext.ssl.keystore.key=null sensitive=true synonyms={DYNAMIC_BROKER_CONFIG:listener.name.plantext.ssl.keystore.key=null} {code} It would be great if the broker can return some metadata of these sensitive configs, like last modified timestamp, to allow readers (ex: the operator) to know if this is an outdated value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18005) Return metadata of sensitive config when describe config
[ https://issues.apache.org/jira/browse/KAFKA-18005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-18005: -- Description: Currently, when describing config for a resource, we'll get `null` if the config is a sensitive config, ex: "ssl.keystore.certificate.chain", "ssl.keystore.password". And when describing configs with them it'll always return something like this: {code:java} > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers > --entity-name 2 --describe Dynamic configs for broker 2 are: listener.name.myssl.ssl.keystore.key=null sensitive=true synonyms={DYNAMIC_BROKER_CONFIG:listener.name.myssl.ssl.keystore.key=null} {code} It would be great if the broker can return some metadata of these sensitive configs, like last modified timestamp, to allow readers (ex: the operator) to know if this is an outdated value. was: Currently, when describing config for a resource, we'll get `null` if the config is a sensitive config, ex: "ssl.keystore.certificate.chain", "ssl.keystore.password". And when describing configs with them it'll always return something like this: {code:java} > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers > --entity-name 2 --describe Dynamic configs for broker 2 are: listener.name.plantext.ssl.keystore.key=null sensitive=true synonyms={DYNAMIC_BROKER_CONFIG:listener.name.plantext.ssl.keystore.key=null} {code} It would be great if the broker can return some metadata of these sensitive configs, like last modified timestamp, to allow readers (ex: the operator) to know if this is an outdated value. > Return metadata of sensitive config when describe config > > > Key: KAFKA-18005 > URL: https://issues.apache.org/jira/browse/KAFKA-18005 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Currently, when describing config for a resource, we'll get `null` if the > config is a sensitive config, ex: "ssl.keystore.certificate.chain", > "ssl.keystore.password". And when describing configs with them it'll always > return something like this: > {code:java} > > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type > > brokers --entity-name 2 --describe > Dynamic configs for broker 2 are: > listener.name.myssl.ssl.keystore.key=null sensitive=true > synonyms={DYNAMIC_BROKER_CONFIG:listener.name.myssl.ssl.keystore.key=null} > {code} > It would be great if the broker can return some metadata of these sensitive > configs, like last modified timestamp, to allow readers (ex: the operator) to > know if this is an outdated value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18004: Use 3.8 to run zk service for e2e [kafka]
chia7712 merged PR #17790: URL: https://github.com/apache/kafka/pull/17790 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17987: Remove assorted ZK-related files [kafka]
chia7712 merged PR #17768: URL: https://github.com/apache/kafka/pull/17768 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17957) Console share consumer unable to consume all messages when using multiple brokers
[ https://issues.apache.org/jira/browse/KAFKA-17957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhinav Dixit resolved KAFKA-17957. --- Resolution: Invalid > Console share consumer unable to consume all messages when using multiple > brokers > - > > Key: KAFKA-17957 > URL: https://issues.apache.org/jira/browse/KAFKA-17957 > Project: Kafka > Issue Type: Sub-task >Reporter: Abhinav Dixit >Priority: Major > > I tried a console share consumer using multiple broker kafka cluster. After a > while, the consumption seemed to fail. It looks like there could be > disconnects happening which is causing issues in messages consumption. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15796) High CPU issue in Kafka Producer when Auth Failed
[ https://issues.apache.org/jira/browse/KAFKA-15796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897855#comment-17897855 ] Alexandru Oiegas commented on KAFKA-15796: -- Hi, we have the same issue. Can this be fixed with higher prio? > High CPU issue in Kafka Producer when Auth Failed > -- > > Key: KAFKA-15796 > URL: https://issues.apache.org/jira/browse/KAFKA-15796 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: xiaotong.wang >Priority: Major > Attachments: image-2023-11-07-14-18-32-016.png > > > How to reproduce > 1、kafka-client 3.x.x Producer config enable.idempotence=true (this is > default) > 2、start kafka server , not contain client user auth info > 3、start client producer , after 3.x,producer will initProducerId and TCM > state trans to INITIALIZING > 4、server reject client reqesut , producer will raise > AuthenticationException > (org.apache.kafka.clients.producer.internals.Sender#maybeSendAndPollTransactionalRequest) > 5、kafka-client org.apache.kafka.clients.producer.internals.Sender#runOnce > catch > AuthenticationException > call transactionManager.authenticationFailed(e); > > synchronized void authenticationFailed(AuthenticationException e) > { for (TxnRequestHandler request : pendingRequests) > request.fatalError(e); } > this method only handle pendingRequest,but inflight request is missing > 6、 TCM state will alway in INITIALIZING > for judgment Condition: currentState != State.INITIALIZING && > !hasProducerId() > 7、producer send mesasge , mesasge go into batch queue,Sender will wake up > and set pollTimeout=0 , prepare to send message > 8、but , before Sender sendProducerData ,it will do message filter > ,RecordAccumulator drain > {-}{{-}}>drainBatchesForOneNode{{-}}{-}>shouldStopDrainBatchesForPartition > when producerIdAndEpoch.isValid()==false,return true, it will not > collect any message > 9、now kafka producer network thread CPU usage will go 100% > 10、even we add user auth info and permission in kafka server ,it can not > self-healing > > > > suggest : > also catch AuthenticationException in > org.apache.kafka.clients.producer.internals.Sender#maybeSendAndPollTransactionalRequest > and respone failed to inflight InitProducerId request > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-9366: Upgrade log4j to log4j2 [kafka]
frankvicky commented on PR #17373: URL: https://github.com/apache/kafka/pull/17373#issuecomment-2472887781 Hello @mimaison,@showuon, Since this PR modifies a large number of files, particularly `build.gradle`, it’s highly susceptible to conflicts with other PRs, making it rather exhausting to resolve these conflicts frequently. It would be helpful if we could merge this PR into trunk sooner, as there aren’t any outstanding issues or points of contention with it. This would also allow us to begin addressing any follow-up issues. Many thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17593; [6/N] Add new record to GroupCoordinatorRecordSerde [kafka]
dajac opened a new pull request, #17791: URL: https://github.com/apache/kafka/pull/17791 This patch extends `GroupCoordinatorRecordSerde` to support the `ConsumerGroupRegularExpression` record. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18004: Use 3.8 to run zk service for e2e [kafka]
FrankYang0529 commented on code in PR #17790: URL: https://github.com/apache/kafka/pull/17790#discussion_r1839920307 ## tests/kafkatest/services/zookeeper.py: ## @@ -44,7 +44,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service): } def __init__(self, context, num_nodes, zk_sasl = False, zk_client_port = True, zk_client_secure_port = False, - zk_tls_encrypt_only = False, version=DEV_BRANCH): Review Comment: Hi @chia7712, I replace all dev with 3.8 in this file and add a comment. Could you take a look again when you have time? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15895) Move DynamicBrokerConfig to server module
[ https://issues.apache.org/jira/browse/KAFKA-15895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897938#comment-17897938 ] Nikolay Izhikov commented on KAFKA-15895: - Hello, [~omnia_h_ibrahim]. Are you working on this ticket? > Move DynamicBrokerConfig to server module > - > > Key: KAFKA-15895 > URL: https://issues.apache.org/jira/browse/KAFKA-15895 > Project: Kafka > Issue Type: Sub-task >Reporter: Omnia Ibrahim >Assignee: Omnia Ibrahim >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-18006: Add 3.9.0 to end-to-end test [kafka]
frankvicky opened a new pull request, #17797: URL: https://github.com/apache/kafka/pull/17797 JIRA: KAFKA-18006 Since 3.9 is released, we should add it to the e2e-test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15895) Move DynamicBrokerConfig to server module
[ https://issues.apache.org/jira/browse/KAFKA-15895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikolay Izhikov reassigned KAFKA-15895: --- Assignee: Nikolay Izhikov (was: Omnia Ibrahim) > Move DynamicBrokerConfig to server module > - > > Key: KAFKA-15895 > URL: https://issues.apache.org/jira/browse/KAFKA-15895 > Project: Kafka > Issue Type: Sub-task >Reporter: Omnia Ibrahim >Assignee: Nikolay Izhikov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14588) Move ConfigCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897944#comment-17897944 ] Nikolay Izhikov commented on KAFKA-14588: - [~loganzhu] Currently, this ticket is blocked by KAFKA-15895 I will try to work on it and move on with command migration. > Move ConfigCommand to tools > --- > > Key: KAFKA-14588 > URL: https://issues.apache.org/jira/browse/KAFKA-14588 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Nikolay Izhikov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17995: Large value for retention.ms could prevent remote data cleanup in Tiered Storage [kafka]
FrankYang0529 commented on code in PR #17794: URL: https://github.com/apache/kafka/pull/17794#discussion_r1840306519 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2765,6 +2765,34 @@ public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws Remot verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1)); } +@Test +public void testDeleteRetentionMsBiggerThanTimeMs() throws RemoteStorageException, ExecutionException, InterruptedException { +LogConfig mockLogConfig = new LogConfig(Map.of("retention.ms", time.milliseconds() + 100)); Review Comment: Yes, set it to 1 month after current epoch. Thanks for the suggestion 👍 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove LogDirFailureTest.testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure [kafka]
chia7712 merged PR #17785: URL: https://github.com/apache/kafka/pull/17785 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14585) Move StorageTool to tools
[ https://issues.apache.org/jira/browse/KAFKA-14585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikolay Izhikov reassigned KAFKA-14585: --- Assignee: Nikolay Izhikov > Move StorageTool to tools > - > > Key: KAFKA-14585 > URL: https://issues.apache.org/jira/browse/KAFKA-14585 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Nikolay Izhikov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15718) KRaft support in UncleanLeaderElectionTest
[ https://issues.apache.org/jira/browse/KAFKA-15718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gantigmaa Selenge resolved KAFKA-15718. --- Fix Version/s: 3.8.0 Resolution: Fixed > KRaft support in UncleanLeaderElectionTest > -- > > Key: KAFKA-15718 > URL: https://issues.apache.org/jira/browse/KAFKA-15718 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Gantigmaa Selenge >Priority: Minor > Labels: kraft, kraft-test, newbie > Fix For: 3.8.0 > > > The following tests in UncleanLeaderElectionTest in > core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala > need to be updated to support KRaft > 103 : def testUncleanLeaderElectionEnabled(): Unit = { > 116 : def testUncleanLeaderElectionDisabled(): Unit = { > 127 : def testUncleanLeaderElectionEnabledByTopicOverride(): Unit = { > 142 : def testUncleanLeaderElectionDisabledByTopicOverride(): Unit = { > 157 : def testUncleanLeaderElectionInvalidTopicOverride(): Unit = { > 286 : def testTopicUncleanLeaderElectionEnable(): Unit = { > Scanned 358 lines. Found 0 KRaft tests out of 6 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10532: Handle state updater failed tasks [kafka]
eduwercamacaro commented on PR #17761: URL: https://github.com/apache/kafka/pull/17761#issuecomment-2473986471 Pinging @cadonna @mjsax @ableegoldman for review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17648: AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizationException and GroupAuthorizationException [kafka]
FrankYang0529 commented on code in PR #17516: URL: https://github.com/apache/kafka/pull/17516#discussion_r1840722461 ## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ## @@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { +val topic = "topic" + +createTopic(topic, listenerName = interBrokerListenerName) + +// allow topic read/write permission to poll/send record +addAndVerifyAcls( + Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) +) +val producer = createProducer() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() +producer.close() + +// allow group read permission to join group +val group = "group" +addAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) +) + +val props = new Properties() +props.put(ConsumerConfig.GROUP_ID_CONFIG, group) +props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") +val consumer = createConsumer(configOverrides = props) +consumer.subscribe(List(topic).asJava) +TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) + +removeAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) +) + +assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() +}) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { Review Comment: It looks like https://github.com/apache/kafka/pull/16686 is almost ready. I will remove `@Disabled` after it's merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14588) Move ConfigCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897969#comment-17897969 ] Logan Zhu commented on KAFKA-14588: --- [~nizhikov] Thank you for the update! I appreciate it. When the blocking issue (KAFKA-15895) is resolved, please feel free to assign the ticket to me if you’re not planning to move forward with it. > Move ConfigCommand to tools > --- > > Key: KAFKA-14588 > URL: https://issues.apache.org/jira/browse/KAFKA-14588 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Nikolay Izhikov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17948: Potential issue during tryComplete and onComplete simultaneous calls to access global variables [kafka]
adixitconfluent commented on code in PR #17739: URL: https://github.com/apache/kafka/pull/17739#discussion_r1840737488 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -1602,8 +1602,6 @@ protected void updateFetchOffsetMetadata(Optional fetchOffset protected Optional fetchOffsetMetadata() { lock.readLock().lock(); try { -if (findNextFetchOffset.get()) Review Comment: hi @junrao, now that I think more about it, IIUC, considering the common case when all fetched data is acquirable - 1. acknowledgements/acquisition lock timeout/ release of records on session close are the only places where we set `findNextFetchOffset` to true 2. In all the 3 scenarios mentioned above, if there is a change to the `endOffset`, we update the `endOffset` (thereby `fetchOffsetMetadata` is also updated automatically with our changes) Hence, I feel that the findNextFetchOffset shouldn't be considered when dealing with the common case. In the not common cases, when Log Start Offset is later than the fetch offset and we need to archive records, then we set `findNextFetchOffset` to True. But we have done the minBytes implementation only for the common cases right now, hence i feel the current change is correct. Please correct me if I am wrong. cc - @apoorvmittal10 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Update javadoc on split to mention first matching [kafka]
bbejeck opened a new pull request, #17799: URL: https://github.com/apache/kafka/pull/17799 Clarify the functionality of `split` matching on first predicate ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-18008) Flaky testMultiConsumerSessionTimeoutOnStopPolling
Lianet Magrans created KAFKA-18008: -- Summary: Flaky testMultiConsumerSessionTimeoutOnStopPolling Key: KAFKA-18008 URL: https://issues.apache.org/jira/browse/KAFKA-18008 Project: Kafka Issue Type: Test Components: clients, consumer Reporter: Lianet Magrans [https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.rootProjectNames=kafka&search.startTimeMax=1731513167005&search.startTimeMin=172905120&search.tags=trunk&search.timeZoneId=America%2FToronto&tests.container=kafka.api.PlaintextConsumerPollTest&tests.test=testMultiConsumerSessionTimeoutOnStopPolling(String%2C%20String)%5B2%5D] I wonder if the in-flight fix on [https://github.com/apache/kafka/pull/17789] is behind this one too. Creating this Jira for visibility, we might just close it if we confirm the fix, or leave it if this needs more work. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18006: Add 3.9.0 to end-to-end test (core, client) [kafka]
frankvicky commented on code in PR #17797: URL: https://github.com/apache/kafka/pull/17797#discussion_r1840779166 ## tests/kafkatest/services/zookeeper.py: ## @@ -45,7 +45,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service): # After 4.0, zookeeper service is removed from source code. Using LATEST_3_8 for compatibility test cases. Review Comment: Thanks for pointing out. 😺 Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update javadoc on split to mention first matching [kafka]
bbejeck merged PR #17799: URL: https://github.com/apache/kafka/pull/17799 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18006: Add 3.9.0 to end-to-end test (core, client) [kafka]
frankvicky commented on PR #17797: URL: https://github.com/apache/kafka/pull/17797#issuecomment-2474156464 Run `client_compatibility_produce_consume_test.py` for validate purpose, and get following fails: ``` FAILED TEST SYMBOLS Pass the test symbols below to your ducktape run 'tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py::ClientCompatibilityProduceConsumeTest.test_produce_consume@{"broker_version":"2.1.1"}' 'tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py::ClientCompatibilityProduceConsumeTest.test_produce_consume@{"broker_version":"2.2.2"}' 'tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py::ClientCompatibilityProduceConsumeTest.test_produce_consume@{"broker_version":"2.3.1"}' ducker-ak test failed ``` These fails are handled by #17625 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update javadoc on split to mention first matching [kafka]
bbejeck commented on PR #17799: URL: https://github.com/apache/kafka/pull/17799#issuecomment-2474163159 Merged #17799 into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-18006: Add 3.9.0 to end-to-end test (streams) [kafka]
frankvicky opened a new pull request, #17800: URL: https://github.com/apache/kafka/pull/17800 JIRA: KAFKA-18006 Since 3.9 is released, we should add it to the e2e-test. Follow-up of #17797 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17278: Add FetchRequest compatibility tests for KafkaRaftClient [kafka]
kevin-wu24 opened a new pull request, #17801: URL: https://github.com/apache/kafka/pull/17801 Adding unit tests for KafkaRaftClient to catch the case discussed here: https://github.com/apache/kafka/pull/16235#discussion_r1672760435 Makes sure that an older quorum controller can handle a newer request version. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update javadoc on split to mention first matching [kafka]
bbejeck commented on PR #17799: URL: https://github.com/apache/kafka/pull/17799#issuecomment-2474217154 Cherry picked to 3.9, 3.8, and 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17593; [6/N] Add new record to GroupCoordinatorRecordSerde [kafka]
dajac merged PR #17791: URL: https://github.com/apache/kafka/pull/17791 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17917) Convert Kafka core system tests to use KRaft
[ https://issues.apache.org/jira/browse/KAFKA-17917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897990#comment-17897990 ] Kevin Wu commented on KAFKA-17917: -- Hey [~yx9o], are you still planning to take on this work or already working on this? No rush, but my plan is to start working on this next week, so I want to make sure I'm not duplicating effort. > Convert Kafka core system tests to use KRaft > > > Key: KAFKA-17917 > URL: https://issues.apache.org/jira/browse/KAFKA-17917 > Project: Kafka > Issue Type: Improvement > Components: core, system tests >Affects Versions: 4.0.0 >Reporter: Kevin Wu >Priority: Blocker > > The downgrade, group mode transactions, security rolling upgrade, and > throttling test should be migrated to using KRaft. The network degrade test > should be refactored to use KafkaService rather than ZookeeperService. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-18009) Remove spurious public constructor for KafkaShareConsumer
Andrew Schofield created KAFKA-18009: Summary: Remove spurious public constructor for KafkaShareConsumer Key: KAFKA-18009 URL: https://issues.apache.org/jira/browse/KAFKA-18009 Project: Kafka Issue Type: Sub-task Reporter: Andrew Schofield Assignee: Andrew Schofield Viewing the javadoc, I can see there are 5 constructors instead of the expected 4. The extra constructor uses an internal class as a parameter and should not be part of the public interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17972: Default all JUnit tests to KRaft [kafka]
cmccabe commented on PR #17727: URL: https://github.com/apache/kafka/pull/17727#issuecomment-2474335828 Convert SaslClientsWithInvalidCredentialsTest.scala to KRaft https://github.com/apache/kafka/pull/17803 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: convert SaslClientsWithInvalidCredentialsTest.scala to KRaft [kafka]
cmccabe opened a new pull request, #17803: URL: https://github.com/apache/kafka/pull/17803 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17850: fix leaking internal exception in state manager [kafka]
sebastienviale commented on code in PR #17711: URL: https://github.com/apache/kafka/pull/17711#discussion_r1839932320 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -538,13 +539,16 @@ public void flush() { } catch (final RuntimeException exception) { if (firstException == null) { // do NOT wrap the error if it is actually caused by Streams itself -if (exception instanceof StreamsException) +// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace +if (exception instanceof FailedProcessingException) +firstException = new StreamsException(exception.getCause()); Review Comment: In case of FailedProcessingException, wrap it into a ProcessorStateException ``` if (exception instanceof FailedProcessingException) { firstException = new ProcessorStateException(exception.getCause()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [WIP] KAFKA-17928: Make remote log manager thread-pool configs dynamic [kafka]
peterxcli opened a new pull request, #17793: URL: https://github.com/apache/kafka/pull/17793 ### Description Add ability to dynamically update remote storage thread pool sizes and implement partition readiness check: - Add methods to RemoteLogManager to update thread pool sizes: - updateCopyThreadPoolSize - updateExpirationThreadPoolSize - updateReaderThreadPoolSize - Add thread pool configuration properties to DynamicRemoteLogConfig - Add RLMM readiness check before executing remote storage tasks - Update validation and reconfiguration logic in DynamicRemoteLogConfig This change allows operators to tune thread pool sizes at runtime and prevents operations on partitions that aren't ready for remote storage operations. ### Related Link - [KIP-1105](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1105%3A+Make+remote+log+manager+thread-pool+configs+dynamic) - [KAFKA-17928](https://issues.apache.org/jira/browse/KAFKA-17928?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20assignee%20in%20(currentUser())%20ORDER%20BY%20priority%20DESC,%20updated%20DESC) *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17995: Large value for retention.ms could prevent remote data cleanup in Tiered Storage [kafka]
FrankYang0529 opened a new pull request, #17794: URL: https://github.com/apache/kafka/pull/17794 If a user has configured value of `retention.ms` to a value > current unix timestamp epoch, then `cleanupUntilMs` becomes negative. https://github.com/apache/kafka/blob/5a5239770ff3565233e5cbecf11446e76339f8fe/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1397 This leads to cleaner failures and all cleaning for that topic partition stops. https://github.com/apache/kafka/blob/5a5239770ff3565233e5cbecf11446e76339f8fe/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L2218 To fix this, return an empty `RetentionTimeData` if `retention.ms` is bigger than current unix timestamp. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17648: AsyncKafkaConsumer#unsubscribe swallow GroupAuthorizationException [kafka]
FrankYang0529 commented on code in PR #17516: URL: https://github.com/apache/kafka/pull/17516#discussion_r1839957439 ## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ## @@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { +val topic = "topic" + +createTopic(topic, listenerName = interBrokerListenerName) + +// allow topic read/write permission to poll/send record +addAndVerifyAcls( + Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) +) +val producer = createProducer() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() +producer.close() + +// allow group read permission to join group +val group = "group" +addAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) +) + +val props = new Properties() +props.put(ConsumerConfig.GROUP_ID_CONFIG, group) +props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") +val consumer = createConsumer(configOverrides = props) +consumer.subscribe(List(topic).asJava) +TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) + +removeAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) +) + +assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() +}) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { Review Comment: Hi @lianetm, thanks for your review. I add test cases for `TopicAuthorizationException` and disable test cases for `close` function. Could you help me review again when you have time? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17850: fix leaking internal exception in state manager [kafka]
sebastienviale commented on code in PR #17711: URL: https://github.com/apache/kafka/pull/17711#discussion_r1839930318 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -538,13 +539,16 @@ public void flush() { } catch (final RuntimeException exception) { if (firstException == null) { // do NOT wrap the error if it is actually caused by Streams itself -if (exception instanceof StreamsException) +// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace +if (exception instanceof FailedProcessingException) +firstException = new StreamsException(exception.getCause()); +else if (exception instanceof StreamsException) firstException = exception; else firstException = new ProcessorStateException( format("%sFailed to flush state store %s", logPrefix, store.name()), exception); } -log.error("Failed to flush state store {}: ", store.name(), exception); +log.error("Failed to flush state store {}: ", store.name(), exception.getCause()); Review Comment: I copied the log.error in the `if (firstException == null)`: `log.error("Failed to flush cache of store {}: ", store.name(), firstException);` else `log.error("Failed to flush cache of store {}: ", store.name(), exception);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17885: Enable clients to rebootstrap based on timeout or error code (KIP-1102) [kafka]
omkreddy commented on code in PR #17720: URL: https://github.com/apache/kafka/pull/17720#discussion_r1839870397 ## core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala: ## @@ -87,3 +128,17 @@ class ConsumerRebootstrapTest extends RebootstrapTest { producer.close() } } + +object ConsumerRebootstrapTest { + + final val RebootstrapTestName = s"${TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames}.useRebootstrapTriggerMs={2}" + def rebootstrapTestParams: stream.Stream[Arguments] = { +assertEquals(1, getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit.count()) Review Comment: nit: why do we need this assert statement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17993) reassign partition tool stuck with uncaught exception: 'value' field is too long to be serialized
[ https://issues.apache.org/jira/browse/KAFKA-17993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-17993: -- Description: Running the reassignment script when a topic had 5000 partitions, with both throttle options being set, the tool remained stuck with an exception The same json file previously passed the --verify step Reproduced on today's trunk (4.0), here's the Stack trace for 3.9.1-SNAPSHOT : {{[2024-11-12 16:15:43,516] ERROR Uncaught exception in thread 'kafka-admin-client-thread | reassign-partitions-tool': (org.apache.kafka.common.utils.KafkaThread)}} {{java.lang.RuntimeException: 'value' field is too long to be serialized}} {{ at org.apache.kafka.common.message.IncrementalAlterConfigsRequestData$AlterableConfig.addSize(IncrementalAlterConfigsRequestData.java:776)}} {{ at org.apache.kafka.common.message.IncrementalAlterConfigsRequestData$AlterConfigsResource.addSize(IncrementalAlterConfigsRequestData.java:463)}} {{ at org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.addSize(IncrementalAlterConfigsRequestData.java:187)}} {{ at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)}} {{ at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)}} {{ at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:108)}} {{ at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:559)}} {{ at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:533)}} {{ at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:493)}} {{ at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1317)}} {{ at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1530)}} {{ at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1473)}} {{ at java.base/java.lang.Thread.run(Thread.java:840)}} was: Running the reassignment script for about 6000 partitions, with both throttle options being set, the tool remained stuck with this exception The same json file previously passed the --verify step reproduced on today's trunk (4.0), here's the Stack trace for 3.9.1-SNAPSHOT : {{[2024-11-12 16:15:43,516] ERROR Uncaught exception in thread 'kafka-admin-client-thread | reassign-partitions-tool': (org.apache.kafka.common.utils.KafkaThread)}} {{java.lang.RuntimeException: 'value' field is too long to be serialized}} {{ at org.apache.kafka.common.message.IncrementalAlterConfigsRequestData$AlterableConfig.addSize(IncrementalAlterConfigsRequestData.java:776)}} {{ at org.apache.kafka.common.message.IncrementalAlterConfigsRequestData$AlterConfigsResource.addSize(IncrementalAlterConfigsRequestData.java:463)}} {{ at org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.addSize(IncrementalAlterConfigsRequestData.java:187)}} {{ at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)}} {{ at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)}} {{ at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:108)}} {{ at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:559)}} {{ at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:533)}} {{ at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:493)}} {{ at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1317)}} {{ at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1530)}} {{ at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1473)}} {{ at java.base/java.lang.Thread.run(Thread.java:840)}} > reassign partition tool stuck with uncaught exception: 'value' field is too > long to be serialized > - > > Key: KAFKA-17993 > URL: https://issues.apache.org/jira/browse/KAFKA-17993 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 4.0.0, 3.6.2, 3.9.0 >Reporter: Edoardo Comar >Priority: Major > > Running the reassignment script when a topic had 5000 partitions, with both > throttle options being set, the tool remained stuck with an exception > The same json file previously passed the --verify step > Reproduced on today's trunk (4.0), here's the Stack trace for 3.9.1-SNAPSHOT : > {{[2024-11-12 16:15:43,516] ERROR Uncaught exception in thread > 'kafka-admin-client-thread | reassign-partitions-tool': > (org.apache.kafka.common.utils.KafkaThread)
[jira] [Comment Edited] (KAFKA-17993) reassign partition tool stuck with uncaught exception: 'value' field is too long to be serialized
[ https://issues.apache.org/jira/browse/KAFKA-17993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897624#comment-17897624 ] Edoardo Comar edited comment on KAFKA-17993 at 11/13/24 9:58 AM: - on the topic with 5000 partitions the IncrementalAlterConfigRequest for {color:#00627a}modifyTopicThrottles {color}looks like : {{{color:#00}AlterConfigOp{opType=SET, configEntry=ConfigEntry(name=leader.replication.throttled.replicas, {color}}} {{{color:#00}value=0:0,0:1,0:2,1000:0,1000:1,1000:2,1001:0,1001:1,1001:2,1002:0,1002:1,1002:2,1003:0,1003:1,1003:2,1004:0,1004:1,1004:2,{color} }} and the value was over 32k so could not be serialized was (Author: ecomar): one topic had 5000 partitions and the IncrementalAlterConfigRequest for {color:#00627a}modifyTopicThrottles {color}looks like : {{{color:#00}AlterConfigOp{opType=SET, configEntry=ConfigEntry(name=leader.replication.throttled.replicas, {color}}} {{{color:#00}value=0:0,0:1,0:2,1000:0,1000:1,1000:2,1001:0,1001:1,1001:2,1002:0,1002:1,1002:2,1003:0,1003:1,1003:2,1004:0,1004:1,1004:2,{color} }} and the value was over 32k so could not be serialized > reassign partition tool stuck with uncaught exception: 'value' field is too > long to be serialized > - > > Key: KAFKA-17993 > URL: https://issues.apache.org/jira/browse/KAFKA-17993 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 4.0.0, 3.6.2, 3.9.0 >Reporter: Edoardo Comar >Priority: Major > > Running the reassignment script when a topic had 5000 partitions, with both > throttle options being set, the tool remained stuck with an exception > The same json file previously passed the --verify step > Reproduced on today's trunk (4.0), here's the Stack trace for 3.9.1-SNAPSHOT : > {{[2024-11-12 16:15:43,516] ERROR Uncaught exception in thread > 'kafka-admin-client-thread | reassign-partitions-tool': > (org.apache.kafka.common.utils.KafkaThread)}} > {{java.lang.RuntimeException: 'value' field is too long to be serialized}} > {{ at > org.apache.kafka.common.message.IncrementalAlterConfigsRequestData$AlterableConfig.addSize(IncrementalAlterConfigsRequestData.java:776)}} > {{ at > org.apache.kafka.common.message.IncrementalAlterConfigsRequestData$AlterConfigsResource.addSize(IncrementalAlterConfigsRequestData.java:463)}} > {{ at > org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.addSize(IncrementalAlterConfigsRequestData.java:187)}} > {{ at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)}} > {{ at > org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)}} > {{ at > org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:108)}} > {{ at > org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:559)}} > {{ at > org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:533)}} > {{ at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:493)}} > {{ at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1317)}} > {{ at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1530)}} > {{ at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1473)}} > {{ at java.base/java.lang.Thread.run(Thread.java:840)}} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [Don't Merge] Test [kafka]
TaiJuWu commented on code in PR #16463: URL: https://github.com/apache/kafka/pull/16463#discussion_r1839868916 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -1023,20 +1038,29 @@ public long nextExpiryTimeMs() { } /* Visible for testing */ -public Deque getDeque(TopicPartition tp) { +Deque getDeque(TopicPartition tp, Short acks) { TopicInfo topicInfo = topicInfoMap.get(tp.topic()); if (topicInfo == null) return null; -return topicInfo.batches.get(tp.partition()); +return topicInfo.batchesWithAcks.get(acks).get(tp.partition()); +} + +List> getAllDequeueForPartition(TopicPartition tp) { +TopicInfo topicInfo = topicInfoMap.get(tp.topic()); +if (topicInfo == null) +return null; +List> res = new ArrayList<>(); +topicInfo.batchesWithAcks.forEach((acks, batches) -> batches.forEach((topicPartition, dq) -> res.add(dq))); Review Comment: Mistake -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Add Kraft migration revert cleanup steps [kafka]
nicolasguyomar opened a new pull request, #17792: URL: https://github.com/apache/kafka/pull/17792 Some cleanup is needed before attempting a new migration ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 Tests for ConfigCommand of DynamicBrokerReconfigurationTest rewritten in java [kafka]
nizhikov closed pull request #15848: KAFKA-14588 Tests for ConfigCommand of DynamicBrokerReconfigurationTest rewritten in java URL: https://github.com/apache/kafka/pull/15848 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15796) High CPU issue in Kafka Producer when Auth Failed
[ https://issues.apache.org/jira/browse/KAFKA-15796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897855#comment-17897855 ] Alexandru Oiegas edited comment on KAFKA-15796 at 11/13/24 8:40 AM: Hi, we have the same issue. Can this be fixed with higher prio? [~xiaotong.wang] [~pnee] was (Author: JIRAUSER307707): Hi, we have the same issue. Can this be fixed with higher prio? > High CPU issue in Kafka Producer when Auth Failed > -- > > Key: KAFKA-15796 > URL: https://issues.apache.org/jira/browse/KAFKA-15796 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: xiaotong.wang >Priority: Major > Attachments: image-2023-11-07-14-18-32-016.png > > > How to reproduce > 1、kafka-client 3.x.x Producer config enable.idempotence=true (this is > default) > 2、start kafka server , not contain client user auth info > 3、start client producer , after 3.x,producer will initProducerId and TCM > state trans to INITIALIZING > 4、server reject client reqesut , producer will raise > AuthenticationException > (org.apache.kafka.clients.producer.internals.Sender#maybeSendAndPollTransactionalRequest) > 5、kafka-client org.apache.kafka.clients.producer.internals.Sender#runOnce > catch > AuthenticationException > call transactionManager.authenticationFailed(e); > > synchronized void authenticationFailed(AuthenticationException e) > { for (TxnRequestHandler request : pendingRequests) > request.fatalError(e); } > this method only handle pendingRequest,but inflight request is missing > 6、 TCM state will alway in INITIALIZING > for judgment Condition: currentState != State.INITIALIZING && > !hasProducerId() > 7、producer send mesasge , mesasge go into batch queue,Sender will wake up > and set pollTimeout=0 , prepare to send message > 8、but , before Sender sendProducerData ,it will do message filter > ,RecordAccumulator drain > {-}{{-}}>drainBatchesForOneNode{{-}}{-}>shouldStopDrainBatchesForPartition > when producerIdAndEpoch.isValid()==false,return true, it will not > collect any message > 9、now kafka producer network thread CPU usage will go 100% > 10、even we add user auth info and permission in kafka server ,it can not > self-healing > > > > suggest : > also catch AuthenticationException in > org.apache.kafka.clients.producer.internals.Sender#maybeSendAndPollTransactionalRequest > and respone failed to inflight InitProducerId request > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17850: fix leaking internal exception in state manager [kafka]
cadonna commented on code in PR #17711: URL: https://github.com/apache/kafka/pull/17711#discussion_r1839727730 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -538,13 +539,16 @@ public void flush() { } catch (final RuntimeException exception) { if (firstException == null) { // do NOT wrap the error if it is actually caused by Streams itself -if (exception instanceof StreamsException) +// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace +if (exception instanceof FailedProcessingException) +firstException = new StreamsException(exception.getCause()); +else if (exception instanceof StreamsException) firstException = exception; else firstException = new ProcessorStateException( format("%sFailed to flush state store %s", logPrefix, store.name()), exception); } -log.error("Failed to flush state store {}: ", store.name(), exception); +log.error("Failed to flush state store {}: ", store.name(), exception.getCause()); Review Comment: Why did you change this? If a `StreamsException` that is NOT a `FailedProcessingException` is thrown, it would be perfectly fine to log the exception. You could do something like: ```java log.error("Failed to flush state store {}: ", store.name(), exception instanceof FailedProcessingException ? exception.getCause() : exception); ``` or maybe it is cleaner to copy the log message inside the `if (firstException == null) {`. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -538,13 +539,16 @@ public void flush() { } catch (final RuntimeException exception) { if (firstException == null) { // do NOT wrap the error if it is actually caused by Streams itself -if (exception instanceof StreamsException) +// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace +if (exception instanceof FailedProcessingException) Review Comment: Could you please add unit tests to `ProcessorStateManagerTest` that verify the correct behavior? You should also verify the behavior when a processing error handler that continues instead of fails is set. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -538,13 +539,16 @@ public void flush() { } catch (final RuntimeException exception) { if (firstException == null) { // do NOT wrap the error if it is actually caused by Streams itself -if (exception instanceof StreamsException) +// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace +if (exception instanceof FailedProcessingException) +firstException = new StreamsException(exception.getCause()); Review Comment: Before we introduced the `FailedProcessingException` we did not wrap the exception into a plain `StreamsException` but we wrapped it into a `ProcessorStateException` in the `else`-branch. We should keep that behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17985) Set default value for share.auto.offset.reset in ShareRoundTripWorker.
[ https://issues.apache.org/jira/browse/KAFKA-17985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shivsundar R resolved KAFKA-17985. -- Resolution: Fixed > Set default value for share.auto.offset.reset in ShareRoundTripWorker. > -- > > Key: KAFKA-17985 > URL: https://issues.apache.org/jira/browse/KAFKA-17985 > Project: Kafka > Issue Type: Sub-task >Reporter: Shivsundar R >Assignee: Shivsundar R >Priority: Major > > After the dynamic config share.auto.offset.reset was implemented, we need to > explicitly set the config to "earliest" as the default value is "latest". -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17648: AsyncKafkaConsumer#unsubscribe swallow GroupAuthorizationException [kafka]
FrankYang0529 commented on code in PR #17516: URL: https://github.com/apache/kafka/pull/17516#discussion_r1839785185 ## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ## @@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { +val topic = "topic" + +createTopic(topic, listenerName = interBrokerListenerName) + +// allow topic read/write permission to poll/send record +addAndVerifyAcls( + Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) +) +val producer = createProducer() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() +producer.close() + +// allow group read permission to join group +val group = "group" +addAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) +) + +val props = new Properties() +props.put(ConsumerConfig.GROUP_ID_CONFIG, group) +props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") +val consumer = createConsumer(configOverrides = props) +consumer.subscribe(List(topic).asJava) +TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) + +removeAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) +) + +assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() +}) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { Review Comment: Hi @lianetm, I would like to confirm again: do you mean that we disable `close` test cases currently, revert change in `AsyncKafkaConsumer#releaseAssignmentAndLeaveGroup` function, and then we will do the change after #16686 is merged? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17648: AsyncKafkaConsumer#unsubscribe swallow GroupAuthorizationException [kafka]
FrankYang0529 commented on code in PR #17516: URL: https://github.com/apache/kafka/pull/17516#discussion_r1839785185 ## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ## @@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { +val topic = "topic" + +createTopic(topic, listenerName = interBrokerListenerName) + +// allow topic read/write permission to poll/send record +addAndVerifyAcls( + Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) +) +val producer = createProducer() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() +producer.close() + +// allow group read permission to join group +val group = "group" +addAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) +) + +val props = new Properties() +props.put(ConsumerConfig.GROUP_ID_CONFIG, group) +props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") +val consumer = createConsumer(configOverrides = props) +consumer.subscribe(List(topic).asJava) +TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) + +removeAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) +) + +assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() +}) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { Review Comment: Hi @lianetm, I would like to confirm again: do you mean that we disable `close` test cases currently, revert change in `AsyncKafkaConsumer#releaseAssignmentAndLeaveGroup` function, and then we will enable this test after #16686 is merged? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17109: Reduce log message load for failed locking [kafka]
cadonna merged PR #16705: URL: https://github.com/apache/kafka/pull/16705 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-17993) reassign partition tool stuck with uncaught exception: 'value' field is too long to be serialized
[ https://issues.apache.org/jira/browse/KAFKA-17993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-17993: - Assignee: Edoardo Comar > reassign partition tool stuck with uncaught exception: 'value' field is too > long to be serialized > - > > Key: KAFKA-17993 > URL: https://issues.apache.org/jira/browse/KAFKA-17993 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 4.0.0, 3.6.2, 3.9.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > > Running the reassignment script when a topic had 5000 partitions, with both > throttle options being set, the tool remained stuck with an exception > The same json file previously passed the --verify step > Reproduced on today's trunk (4.0), here's the Stack trace for 3.9.1-SNAPSHOT : > {{[2024-11-12 16:15:43,516] ERROR Uncaught exception in thread > 'kafka-admin-client-thread | reassign-partitions-tool': > (org.apache.kafka.common.utils.KafkaThread)}} > {{java.lang.RuntimeException: 'value' field is too long to be serialized}} > {{ at > org.apache.kafka.common.message.IncrementalAlterConfigsRequestData$AlterableConfig.addSize(IncrementalAlterConfigsRequestData.java:776)}} > {{ at > org.apache.kafka.common.message.IncrementalAlterConfigsRequestData$AlterConfigsResource.addSize(IncrementalAlterConfigsRequestData.java:463)}} > {{ at > org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.addSize(IncrementalAlterConfigsRequestData.java:187)}} > {{ at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)}} > {{ at > org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)}} > {{ at > org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:108)}} > {{ at > org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:559)}} > {{ at > org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:533)}} > {{ at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:493)}} > {{ at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1317)}} > {{ at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1530)}} > {{ at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1473)}} > {{ at java.base/java.lang.Thread.run(Thread.java:840)}} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR Fix a few test names [kafka]
mumrah merged PR #17788: URL: https://github.com/apache/kafka/pull/17788 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add Kraft migration revert cleanup steps [kafka]
cmccabe commented on code in PR #17792: URL: https://github.com/apache/kafka/pull/17792#discussion_r1840929276 ## docs/ops.html: ## @@ -4182,6 +4182,14 @@ Reverting to ZooKeeper mode During the Migration If you did not fully complete any step, back out whatever you have done and then follow revert directions for the last fully completed step. + +In any case, once reverting is completed, you must clean-up any migration leftover before you attempt a new one + + +Remove __cluster_metadata logs directory from KRaft controllers +Remove __cluster_metadata logs directory from Zookeeper brokers Review Comment: They'll be removed from the brokers automatically. The controllers are the ones that need to be manually removed. (We should probalby add a note about this.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Fix a few test names [kafka]
mumrah commented on PR #17788: URL: https://github.com/apache/kafka/pull/17788#issuecomment-2474400495 No worries, @bbejeck. Thanks for taking a look 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15995: Adding KIP-877 support to Connect [kafka]
mimaison opened a new pull request, #17804: URL: https://github.com/apache/kafka/pull/17804 Built on top of #17511, the 2nd commit adds KIP-877 support to Connect ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17992: Remove `getUnderlying` and `isKRaftTest` from ClusterInstance [kafka]
Yunyung opened a new pull request, #17802: URL: https://github.com/apache/kafka/pull/17802 As title, - Since ZK will be removed in version 4.0, `isKRaftTest` is no longer required. - To avoid directly exposing or returning the underlying object through interface `getUnderlying`, which is responsible for setting up and tearing down the cluster, we plan to remove `getUnderlying`. We plan to design and implement a better interface in the future. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18001: Support UpdateRaftVoterRequest in KafkaNetworkChannel (WIP) [kafka]
cmccabe commented on code in PR #17773: URL: https://github.com/apache/kafka/pull/17773#discussion_r1841009664 ## raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java: ## @@ -187,6 +189,8 @@ static AbstractRequest.Builder buildRequest(ApiMessag return new FetchRequest.SimpleBuilder((FetchRequestData) requestData); if (requestData instanceof FetchSnapshotRequestData) return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData) requestData); +if (requestData instanceof UpdateRaftVoterRequestData) Review Comment: Do the controllers send `RemoveRaftVoterRequestData` and `AddRaftVoterRequestData` here? I don't think so? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18001: Support UpdateRaftVoterRequest in KafkaNetworkChannel (WIP) [kafka]
ahuang98 commented on code in PR #17773: URL: https://github.com/apache/kafka/pull/17773#discussion_r1841016340 ## raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java: ## @@ -187,6 +189,8 @@ static AbstractRequest.Builder buildRequest(ApiMessag return new FetchRequest.SimpleBuilder((FetchRequestData) requestData); if (requestData instanceof FetchSnapshotRequestData) return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData) requestData); +if (requestData instanceof UpdateRaftVoterRequestData) Review Comment: that's what I thought as well, I don't think we need to add either here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17783: Adding listeners to remove share partition on partition changes [kafka]
apoorvmittal10 commented on PR #17796: URL: https://github.com/apache/kafka/pull/17796#issuecomment-2474212658 Thanks @AndrewJSchofield for review and good points. > Can the partition become a leader after it has previously become a follower? I just wonder whether the state machine is more complicated than this PR implies? Yes, that can always happen. Once it does then next share fetch request should load the share partition. The scenario is not very different then partition being shuffled across brokers. The broker who is the leader of the partition should be able to load share partition. > The listener is called under a lock in the Partition. The listener implementation needs to be very careful not to do anything brave under that lock. I expect there is potential for a deadlock here. The lock is for each Partition, but I do get the point. I can make the call outside lock as well, I ll wait for @junrao's comments as he can help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17917) Convert Kafka core system tests to use KRaft
[ https://issues.apache.org/jira/browse/KAFKA-17917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17898023#comment-17898023 ] Azhar Ahmed commented on KAFKA-17917: - Hi [~kevinwu2412] , I would like to participate if you need an extra pair hands. > Convert Kafka core system tests to use KRaft > > > Key: KAFKA-17917 > URL: https://issues.apache.org/jira/browse/KAFKA-17917 > Project: Kafka > Issue Type: Improvement > Components: core, system tests >Affects Versions: 4.0.0 >Reporter: Kevin Wu >Priority: Blocker > > The downgrade, group mode transactions, security rolling upgrade, and > throttling test should be migrated to using KRaft. The network degrade test > should be refactored to use KafkaService rather than ZookeeperService. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR Fix a few test names [kafka]
mumrah commented on PR #17788: URL: https://github.com/apache/kafka/pull/17788#issuecomment-2474414294 Here's what the catalog diff will be with this PR: ``` diff --git a/test-catalog/clients/tests.yaml b/test-catalog/clients/tests.yaml index 4b39d4f069..51d7850671 100644 --- a/test-catalog/clients/tests.yaml +++ b/test-catalog/clients/tests.yaml @@ -816,12 +816,12 @@ org.apache.kafka.clients.consumer.OffsetAndMetadataTest: - testInvalidNegativeOffset - testSerializationRoundtrip org.apache.kafka.clients.consumer.RangeAssignorTest: -- rackConfig - testCoPartitionedAssignmentWithSameSubscription - testMultipleConsumersMixedTopics - testOneConsumerMultipleTopics - testOneConsumerNoTopic - testOneConsumerNonexistentTopic +- testOneConsumerOneTopic - testOneStaticConsumerAndOneDynamicConsumerTwoTopicsSixPartitions - testOnlyAssignsPartitionsFromSubscribedTopics - testRackAwareAssignmentWithCoPartitioning @@ -3230,7 +3230,9 @@ org.apache.kafka.common.metrics.stats.RateTest: - testRateIsConsistentAfterTheFirstWindow - testRateWithNoPriorAvailableSamples org.apache.kafka.common.metrics.stats.SampledStatTest: -- Sample +- testSampleIsKeptIfOverlaps +- testSampleIsKeptIfOverlapsAndExtra +- testSampleIsPurgedIfDoesntOverlap org.apache.kafka.common.network.ChannelBuildersTest: - testChannelBuilderConfigs - testCreateConfigurableKafkaPrincipalBuilder @@ -3353,7 +3355,7 @@ org.apache.kafka.common.network.SslTransportTls12Tls13Test: - testCiphersSuiteForTls12FailsForTls13 - testCiphersSuiteForTls13 org.apache.kafka.common.network.SslVersionsTransportLayerTest: -- tlsServerProtocol +- testTlsDefaults org.apache.kafka.common.network.Tls12SelectorTest: - registerFailure - testBytesBufferedChannelAfterMute @@ -3616,7 +3618,7 @@ org.apache.kafka.common.record.FileRecordsTest: - testTruncateNotCalledIfSizeIsBiggerThanTargetSize - testTruncateNotCalledIfSizeIsSameAsTargetSize org.apache.kafka.common.record.LazyDownConversionRecordsTest: -- compressionType +- testConversion - testConversionOfCommitMarker org.apache.kafka.common.record.LegacyRecordTest: - testChecksum diff --git a/test-catalog/storage/tests.yaml b/test-catalog/storage/tests.yaml index 8a26477731..f978d606d9 100644 --- a/test-catalog/storage/tests.yaml +++ b/test-catalog/storage/tests.yaml @@ -27,8 +27,8 @@ org.apache.kafka.server.log.remote.metadata.storage.RemoteLogLeaderEpochStateTes - testListAllRemoteLogSegmentsShouldReturnSortedSegments - testListAllRemoteLogSegmentsShouldThrowErrorForUnknownSegmentId org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCacheTest: -- isInitialized - testCacheAddMetadataOnInvalidArgs +- testCacheUpdateMetadataOnInvalidArgs - testDropEventOnInvalidStateTransition org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataFormatterTest: - testFormat diff --git a/test-catalog/streams/integration-tests/tests.yaml b/test-catalog/streams/integration-tests/tests.yaml index 318a98c716..cade38190f 100644 --- a/test-catalog/streams/integration-tests/tests.yaml +++ b/test-catalog/streams/integration-tests/tests.yaml @@ -142,9 +142,10 @@ org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest: org.apache.kafka.streams.integration.KafkaStreamsCloseOptionsIntegrationTest: - testCloseOptions org.apache.kafka.streams.integration.KafkaStreamsTelemetryIntegrationTest: -- Correct -- End -- Streams +- passedMetricsShouldNotLeakIntoClientMetrics +- shouldPassCorrectMetricsDynamicInstances +- shouldPassMetrics +- shouldPushMetricsToBroker org.apache.kafka.streams.integration.LagFetchIntegrationTest: - shouldFetchLagsDuringRebalancingWithNoOptimization - shouldFetchLagsDuringRebalancingWithOptimization diff --git a/test-catalog/streams/test-utils/tests.yaml b/test-catalog/streams/test-utils/tests.yaml index 626d4ec93c..6361f616c6 100644 --- a/test-catalog/streams/test-utils/tests.yaml +++ b/test-catalog/streams/test-utils/tests.yaml @@ -155,7 +155,7 @@ org.apache.kafka.streams.test.MockProcessorContextAPITest: - shouldCaptureRecordsOutputToChildByName - shouldStoreAndReturnStateStores org.apache.kafka.streams.test.MockProcessorContextStateStoreTest: -- builder +- shouldEitherInitOrThrow org.apache.kafka.streams.test.TestRecordTest: - testConsumerRecord - testEqualsAndHashCode diff --git a/test-catalog/streams/tests.yaml b/test-catalog/streams/tests.yaml index 51a6115e96..59347bc0cf 100644 --- a/test-catalog/streams/tests.yaml +++ b/test-catalog/streams/tests.yaml @@ -421,10 +421,14 @@ org.apache.kafka.streams.internals.metrics.ClientMetricsTest: - shouldAddVersionMetric - shouldGetFailedStreamThreadsSensor org.apache.kafka.stream
Re: [PR] KAFKA-17747: Trigger rebalance on rack topology changes [kafka]
jeffkbkim commented on code in PR #17444: URL: https://github.com/apache/kafka/pull/17444#discussion_r1840991727 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ## @@ -526,12 +527,12 @@ public void testReplayConsumerGroupPartitionMetadata() { ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey(); ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); -coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new CoordinatorRecord( +// ConsumerGroupPartitionMetadataKey/Value is deprecated after 4.0. Review Comment: What does this mean? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: convert GssapiAuthenticationTest to KRaft [kafka]
mjsax commented on code in PR #17786: URL: https://github.com/apache/kafka/pull/17786#discussion_r1841030801 ## core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala: ## @@ -109,7 +110,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { * are able to connect after the second re-login. Verifies that logout is performed only once * since duplicate logouts without successful login results in NPE from Java 9 onwards. */ - @Test + @ParameterizedTest + @ValueSource(strings = Array("kraft")) Review Comment: There is no input parameter to `testLoginFailure()` -- why change this to being a parametrized test? (Same below.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: convert GssapiAuthenticationTest to KRaft [kafka]
mjsax commented on code in PR #17786: URL: https://github.com/apache/kafka/pull/17786#discussion_r1841029706 ## core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala: ## @@ -92,15 +92,16 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { * Tests that Kerberos replay error `Request is a replay (34)` is not handled as an authentication exception * since replay detection used to detect DoS attacks may occasionally reject valid concurrent requests. */ - @Test - def testRequestIsAReplay(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testRequestIsAReplay(quorum: String): Unit = { Review Comment: Why is `quorum` an input parameter to this test? Seems the value would always be `"kraft"`? -- I also also don't see where `quorum` is actually used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16985: Ensure consumer attempts to send leave request on close even if interrupted [kafka]
lianetm merged PR #16686: URL: https://github.com/apache/kafka/pull/16686 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
[ https://issues.apache.org/jira/browse/KAFKA-17518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-17518. Resolution: Fixed > AsyncKafkaConsumer cannot reliably leave group when closed with small timeout > - > > Key: KAFKA-17518 > URL: https://issues.apache.org/jira/browse/KAFKA-17518 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot > complete, leading to the consumer remaining in the consumer group. > On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the > consumer group. This process requires hops back and forth between the > application and background threads to call the > {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to > the close step. > The events used to communicate between the application and background threads > are based on the timeout provided by the user. If the timeout is not > sufficient, the events will expire, and the process will be left incomplete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17519) Define and validate correctness of Consumer.close() and its timeout when thread is interrupted
[ https://issues.apache.org/jira/browse/KAFKA-17519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17898032#comment-17898032 ] Lianet Magrans commented on KAFKA-17519: [~kirktrue] what exactly do we want to achieve with this one, now that KAFKA-16985 is fixed? (including comments you added on close that seem to have documented your findings [https://github.com/apache/kafka/blob/b6b2c9ebc45bd60572c24355886620dbdc406ce9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1238-L1299)|https://github.com/apache/kafka/blob/b6b2c9ebc45bd60572c24355886620dbdc406ce9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1238-L1299] Asking just to close this one if it's solved by KAFKA-16985 already. Thanks! > Define and validate correctness of Consumer.close() and its timeout when > thread is interrupted > -- > > Key: KAFKA-17519 > URL: https://issues.apache.org/jira/browse/KAFKA-17519 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > The repercussions of a thread's interrupt status on {{Consumer.close()}} and > its timeout is not well defined. It _appears_ that the > {{ClassicKafkaConsumer}} will continue to attempt to close all its resources > even if an interrupt was triggered prior to—or during—the call to {{close()}} > though it effectively ignores the user's supplied timeout since each call to > {{NetworkClient.poll()}} will throw an {{InterruptException}} after first > making an attempt to poll the socket. > The task here is to review the existing code, verify the behavior with some > unit/integration tests, and document it. Furthermore, once the intended > behavior has been confirmed, the {{AsyncKafkaConsumer}} should be updated to > behave likewise. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17648: AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizationException and GroupAuthorizationException [kafka]
lianetm commented on code in PR #17516: URL: https://github.com/apache/kafka/pull/17516#discussion_r1840374672 ## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ## @@ -132,6 +133,136 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumeUnsubscribeWithoutTopicPermission(quorum: String, groupProtocol: String): Unit = { Review Comment: I wonder if this is better covered at the unit test level only. Here I don't see how we can trust the test is actually testing the unsubscribe changes. The trick is that the topic error comes in a metadata response, but the unsubscribe completes successfully as soon as it gets a response to the HB, so it will always complete ok unless we get a metadata response before the HB response right? It's a really small window btw, because we only processingBackgroundEvents (discover errors) from the moment we send the Unsubscribe (leave HB), to the moment we get a response. Then we stop processing background events, so nothing will be thrown even if it arrives in a response. The other `testConsumeUnsubscribeWithoutGroupPermission` makes sense to me, because the group error comes in a HB response, as well as the unsusbcribe response, so we can trust that if the unsubscribe does not throw is because we're indeed swallowing the exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12469) The topic names in the metrics do not retain their format when extracting through JMX.
[ https://issues.apache.org/jira/browse/KAFKA-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17898033#comment-17898033 ] Apoorv Mittal commented on KAFKA-12469: --- [~chia7712] [~junrao] If we need this then it might be worth to include it in Kafka 4.0 major release. I have already surfaced a discussion thread. > The topic names in the metrics do not retain their format when extracting > through JMX. > -- > > Key: KAFKA-12469 > URL: https://issues.apache.org/jira/browse/KAFKA-12469 > Project: Kafka > Issue Type: Bug > Components: consumer, metrics >Reporter: Rafał Chmielewski >Assignee: Apoorv Mittal >Priority: Major > Attachments: JConsole - Kafka Client.png, Screenshot 2024-11-12 at > 21.35.59.png, image-2024-11-12-21-32-55-180.png > > > I have topic names that have a period in the name: > product.order > product.offering.price > > However, for the metrics issued by JMX by a program that is a consumer of > Kafka messages, the dots are replaced with an underscore: > kafka.consumer client-id=consumer-export-4, topic=product_offering_price, > partition=1><>records-lead > > This creates a problem if I want to calculate the customer's lag in relation > to the number of messages on Kafka. > > But for the producer, this problem doesn't occur: > kafka.producer client-id=bss.data.verification.pi_1, > topic=product.offering.price><>record-send-total > > As a consumer I use Akka Alpakka. But I think it's using Apache library to > connect to Kafka and report metrics via JMX. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17783: Adding listeners to remove share partition on partition changes [kafka]
mumrah commented on PR #17796: URL: https://github.com/apache/kafka/pull/17796#issuecomment-2474632453 @apoorvmittal10 what is the purpose of this listener? Is is so that SPM can clean up its in-memory state when it is no longer the leader for a partition? If that's the main use case, I think we should consider tying into the metadata system directly rather than coupling ourselves to Partition. @dajac, since its looks like you added it, maybe you can comment on the intended usage of PartitionListener? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-18006) add 3.9.0 to end-to-end test
[ https://issues.apache.org/jira/browse/KAFKA-18006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-18006: -- Assignee: TengYao Chi (was: Chia-Ping Tsai) > add 3.9.0 to end-to-end test > > > Key: KAFKA-18006 > URL: https://issues.apache.org/jira/browse/KAFKA-18006 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-18006) add 3.9.0 to end-to-end test
Chia-Ping Tsai created KAFKA-18006: -- Summary: add 3.9.0 to end-to-end test Key: KAFKA-18006 URL: https://issues.apache.org/jira/browse/KAFKA-18006 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15995: Initial API + make Producer/Consumer plugins Monitorable [kafka]
mimaison commented on PR #17511: URL: https://github.com/apache/kafka/pull/17511#issuecomment-2473712965 I rebased on trunk to resolve the conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18006) add 3.9.0 to end-to-end test
[ https://issues.apache.org/jira/browse/KAFKA-18006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897928#comment-17897928 ] TengYao Chi commented on KAFKA-18006: - Hi [~chia7712] I would like to have this issue if you aren't start working on it. > add 3.9.0 to end-to-end test > > > Key: KAFKA-18006 > URL: https://issues.apache.org/jira/browse/KAFKA-18006 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-18001) KafkaNetworkChannel missing UpdateRaftVoterRequestData logic
[ https://issues.apache.org/jira/browse/KAFKA-18001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897954#comment-17897954 ] Omnia Ibrahim commented on KAFKA-18001: --- Hi [~gnarula] and [~alyssahuang] I believe it is a duplicate of my bug report in KAFKA-17996 > KafkaNetworkChannel missing UpdateRaftVoterRequestData logic > > > Key: KAFKA-18001 > URL: https://issues.apache.org/jira/browse/KAFKA-18001 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.9.0, 3.9.1 >Reporter: Alyssa Huang >Priority: Major > > buildRequest needs an if case for UpdateRaftVoterRequestData -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR Fix a few test names [kafka]
mumrah commented on PR #17788: URL: https://github.com/apache/kafka/pull/17788#issuecomment-2473827213 @chia7712 I'm not sure it's preventable. It doesn't seem to be a widespread issue though, so hopefully fixing these will prevent copy/paste propagation of the issue. We should probably consider writing some testing guidelines to mention issues like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-18007) MirrorCheckpointConnector fails with “Timeout while loading consumer groups” after upgrading to Kafka 3.9.0
Asker created KAFKA-18007: - Summary: MirrorCheckpointConnector fails with “Timeout while loading consumer groups” after upgrading to Kafka 3.9.0 Key: KAFKA-18007 URL: https://issues.apache.org/jira/browse/KAFKA-18007 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.9.0 Environment: - Kafka Version: Upgraded sequentially from 3.6.0 to 3.9.0 - Clusters: Three clusters named A, B, and C - Clusters A and B mirror topics to cluster C using MirrorMaker 2 - Number of Consumer Groups: Approximately 200 - Number of Topics: Approximately 2000 - Operating System: Ubuntu 20.04.5 LTS (GNU/Linux 5.4.0-135-generic x86_64) Reporter: Asker After upgrading our Kafka clusters from version 3.6.0 to 3.9.0, we started experiencing repeated errors with the MirrorCheckpointConnector in MirrorMaker 2. The connector fails with a RetriableException stating “Timeout while loading consumer groups.” This issue persists despite several attempts to resolve it. Error Message: {code:bash} Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: [2024-11-11 12:21:57,342] ERROR [Worker clientId=analytics-dev->app-dev, groupId=analytics-dev-mm2] Failed to reconfigure connector's tasks (MirrorCheckpointConnector), retrying after backoff. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2195) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: org.apache.kafka.connect.errors.RetriableException: Timeout while loading consumer groups. Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at org.apache.kafka.connect.mirror.MirrorCheckpointConnector.taskConfigs(MirrorCheckpointConnector.java:138) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:398) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2243) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2183) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$47(DistributedHerder.java:2199) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2402) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:498) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:383) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) Nov 11 12:21:57 kafka-analytics-2a.dev.vm.tech connect-mirror-maker.sh[2526630]: at java.base/java.lang.Thread.run(Thread.java:840){code} Steps to Reproduce: 1. Upgrade Kafka clusters sequentially from 3.6.0 to 3.9.0. 2. Configure MirrorMaker 2 to mirror topics from clusters A and B to cluster C. 3. Start MirrorMaker 2. 4. Observe the logs for the MirrorCheckpointConnector. What We Tried: {*}Checked ACLs and Authentication{*}: - Ensured that the mirror_maker user has the necessary permissions and can authenticate successfully. - Verified that we could list consumer groups using kafka-consumer-groups.sh with the mirror_maker user. {*}Increased Timeouts{*}: - Increased admin.timeout.ms to 30 (5 minutes) and even higher values. - Adjusted admin.request.timeout.ms and admin.retry.backoff.ms accordingly. {*}Enabled Detailed Logging{*}: - Set the logging level to DEBUG for org.apache.kafka.connect.mirror to gain more insights. - No additional information that could help resolve the issue was found. {*
Re: [PR] KAFKA-17593; [5/N] Include resolved regular expressions into target assignment computation [kafka]
dajac merged PR #17750: URL: https://github.com/apache/kafka/pull/17750 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16985: Ensure consumer attempts to send leave request on close even if interrupted [kafka]
kirktrue commented on code in PR #16686: URL: https://github.com/apache/kafka/pull/16686#discussion_r1840505903 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ## @@ -549,31 +578,40 @@ public CompletableFuture leaveGroup() { CompletableFuture leaveResult = new CompletableFuture<>(); leaveGroupInProgress = Optional.of(leaveResult); -CompletableFuture callbackResult = signalMemberLeavingGroup(); -callbackResult.whenComplete((result, error) -> { -if (error != null) { -log.error("Member {} callback to release assignment failed. It will proceed " + -"to clear its assignment and send a leave group heartbeat", memberId, error); -} else { -log.info("Member {} completed callback to release assignment. It will proceed " + -"to clear its assignment and send a leave group heartbeat", memberId); -} - -// Clear the subscription, no matter if the callback execution failed or succeeded. -subscriptions.unsubscribe(); -clearAssignment(); +if (runCallbacks) { +CompletableFuture callbackResult = signalMemberLeavingGroup(); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("Member {} callback to release assignment failed. It will proceed " + +"to clear its assignment and send a leave group heartbeat", memberId, error); +} else { +log.info("Member {} completed callback to release assignment. It will proceed " + +"to clear its assignment and send a leave group heartbeat", memberId); +} -// Transition to ensure that a heartbeat request is sent out to effectively leave the -// group (even in the case where the member had no assignment to release or when the -// callback execution failed.) -transitionToSendingLeaveGroup(false); -}); +// Clear the assignment, no matter if the callback execution failed or succeeded. +clearAssignmentAndLeaveGroup(); +}); +} else { +clearAssignmentAndLeaveGroup(); +} // Return future to indicate that the leave group is done when the callbacks // complete, and the transition to send the heartbeat has been made. return leaveResult; } +private void clearAssignmentAndLeaveGroup() { +subscriptions.unsubscribe(); +clearAssignment(); +notifyAssignmentChange(Collections.emptySet()); Review Comment: Correct. I removed the extra call since `clearAssignment()` handles it where appropriate anyway. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1216,6 +1235,69 @@ public void close(Duration timeout) { } } +/** + * Please keep these tenets in mind for the implementation of the {@link AsyncKafkaConsumer}’s + * {@link #close(Duration)} method. In the future, these tenets may be made officially part of the top-level + * {@link KafkaConsumer#close(Duration)} API, but for now they remain here. + * + * + * + * The execution of the {@link ConsumerRebalanceListener} callback (if applicable) must be performed on + * the application thread to ensure it does not interfere with the network I/O on the background thread. + * + * + * The {@link ConsumerRebalanceListener} callback execution must complete before an attempt to leave + * the consumer group is performed. In this context, “complete” does not necessarily imply + * success; execution is “complete” even if the execution fails with an error. + * + * + * Any error thrown during the {@link ConsumerRebalanceListener} callback execution will be caught to + * ensure it does not prevent execution of the remaining {@link #close()} logic. + * + * + * The application thread will be blocked during the entire duration of the execution of the + * {@link ConsumerRebalanceListener}. The consumer does not employ a mechanism to short-circuit the + * callback execution, so execution is not bound by the timeout in {@link #close(Duration)}. + * + * + * A given {@link ConsumerRebalanceListener} implementation may be affected by the application thread's + * interrupt state. If the callback implementation performs any blocking operations, it may result in + * an error. An implementation may choose to preemptively check the thread's interrupt flag via + * {@link Thread#isI
Re: [PR] KAFKA-18001: Support UpdateRaftVoterRequest in KafkaNetworkChannel (WIP) [kafka]
OmniaGM commented on code in PR #17773: URL: https://github.com/apache/kafka/pull/17773#discussion_r1840517434 ## raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java: ## @@ -187,6 +189,8 @@ static AbstractRequest.Builder buildRequest(ApiMessag return new FetchRequest.SimpleBuilder((FetchRequestData) requestData); if (requestData instanceof FetchSnapshotRequestData) return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData) requestData); +if (requestData instanceof UpdateRaftVoterRequestData) Review Comment: I believe `RemoveRaftVoterRequestData` needs to be added as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org