[jira] [Resolved] (KAFKA-18831) Migrating to log4j2 introduce behavior changes of adjusting level dynamically
[ https://issues.apache.org/jira/browse/KAFKA-18831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-18831. Resolution: Fixed trunk: https://github.com/apache/kafka/commit/d31cbf59dee6a77c1c673db06900ecc238894ed7 4.0: https://github.com/apache/kafka/commit/d3791c39e3e426161ea4f8fd95ca3c97b049dd95 > Migrating to log4j2 introduce behavior changes of adjusting level dynamically > - > > Key: KAFKA-18831 > URL: https://issues.apache.org/jira/browse/KAFKA-18831 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Blocker > Fix For: 4.0.0 > > > There are two behavior changes introduced by log4j 2 > 1) > in log4j 1, users can't change the logger by parent if the logger is declared > by properties explicitly. For example, `org.apache.kafka.controller` has > level explicitly in the properties. Hence, we can't use > "org.apache.kafka=INFO" to change the level of `org.apache.kafka.controller` > to INFO. By contrast, log4j2 allows us to change all child loggers by the > parent logger. > 2) in log4j2, we can change the level of root to impact all loggers' level. > By contrast, log4j 1 can't. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18831: Migrating to log4j2 introduce behavior changes of adjusting level dynamically [kafka]
chia7712 commented on PR #18969: URL: https://github.com/apache/kafka/pull/18969#issuecomment-2673874056 cherry-pick to 4.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18841) Enable to test docker image locally
[ https://issues.apache.org/jira/browse/KAFKA-18841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929069#comment-17929069 ] PoAn Yang commented on KAFKA-18841: --- Hi [~chia7712], I think we can move build step to Dockerfile and use a final image to get the result. If you're not working on this, I can work on it. Thank you. > Enable to test docker image locally > --- > > Key: KAFKA-18841 > URL: https://issues.apache.org/jira/browse/KAFKA-18841 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > Currently, the docker image assumes the distribution is from release page - > that brings some troubles in local test. > 1) we need to run a http server to offer downloadable url > 2) we need to pass gpq check -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-18841) Enable to test docker image locally
[ https://issues.apache.org/jira/browse/KAFKA-18841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-18841: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > Enable to test docker image locally > --- > > Key: KAFKA-18841 > URL: https://issues.apache.org/jira/browse/KAFKA-18841 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Major > > Currently, the docker image assumes the distribution is from release page - > that brings some troubles in local test. > 1) we need to run a http server to offer downloadable url > 2) we need to pass gpq check -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add a separate page for zk2kraft.html [kafka]
chia7712 commented on code in PR #18961: URL: https://github.com/apache/kafka/pull/18961#discussion_r1965108766 ## docs/toc.html: ## @@ -27,6 +27,9 @@ 1.3 Quick Start 1.4 Ecosystem 1.5 Upgrading + +1.5.1 Removed Zookeeper Configurations and Metrics Review Comment: why not using the title "Differences Between KRaft mode and Zookeeper mode"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18737: KafkaDockerWrapper setup functions fails due to storage format command [kafka]
chia7712 commented on code in PR #18844: URL: https://github.com/apache/kafka/pull/18844#discussion_r1965105936 ## core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala: ## @@ -52,7 +53,14 @@ object KafkaDockerWrapper extends Logging { } val formatCmd = formatStorageCmd(finalConfigsPath, envVars) -StorageTool.main(formatCmd) +try { + StorageTool.main(formatCmd) +} catch { + case terseFailure: TerseFailure => if (terseFailure.getMessage.contains(QuorumConfig.QUORUM_VOTERS_CONFIG)) { +throw new TerseFailure("The Docker image does not support Dynamic Quorum yet.") Review Comment: Could you please add `terseFailure` to the cause of `TerseFailure`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18823: native docker do not honor the result of checking sign key [kafka]
Parkerhiphop closed pull request #18984: KAFKA-18823: native docker do not honor the result of checking sign key URL: https://github.com/apache/kafka/pull/18984 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-18841) Enable to test docker image locally
Chia-Ping Tsai created KAFKA-18841: -- Summary: Enable to test docker image locally Key: KAFKA-18841 URL: https://issues.apache.org/jira/browse/KAFKA-18841 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai Currently, the docker image assumes the distribution is from release page - that brings some troubles in local test. 1) we need to run a http server to offer downloadable url 2) we need to pass gpq check -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18737: KafkaDockerWrapper setup functions fails due to storage format command [kafka]
chia7712 commented on code in PR #18844: URL: https://github.com/apache/kafka/pull/18844#discussion_r1965070367 ## docker/jvm/Dockerfile: ## @@ -22,20 +22,24 @@ USER root # Get kafka from https://archive.apache.org/dist/kafka and pass the url through build arguments ARG kafka_url +ARG skip_signing=false Review Comment: hi @frankvicky @VedarthConfluent I've opened KAFKA-18841 to enable local testing of the Docker image. Copying the local distribution without the GPQ check seems preferable. Maybe we can revert the workaround and streamline this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18841) Enable to test docker image locally
[ https://issues.apache.org/jira/browse/KAFKA-18841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929066#comment-17929066 ] Chia-Ping Tsai commented on KAFKA-18841: Maybe we should offer a way to copy the distribution from local and skip the gpq check > Enable to test docker image locally > --- > > Key: KAFKA-18841 > URL: https://issues.apache.org/jira/browse/KAFKA-18841 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > Currently, the docker image assumes the distribution is from release page - > that brings some troubles in local test. > 1) we need to run a http server to offer downloadable url > 2) we need to pass gpq check -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18737: KafkaDockerWrapper setup functions fails due to storage format command [kafka]
chia7712 commented on code in PR #18844: URL: https://github.com/apache/kafka/pull/18844#discussion_r1965100766 ## core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala: ## @@ -52,7 +53,14 @@ object KafkaDockerWrapper extends Logging { } val formatCmd = formatStorageCmd(finalConfigsPath, envVars) -StorageTool.main(formatCmd) +try { + StorageTool.main(formatCmd) +} catch { + case terseFailure: TerseFailure => if (terseFailure.getMessage.contains(QuorumConfig.QUORUM_VOTERS_CONFIG)) { +throw new TerseFailure("The Docker image does not support Dynamic Quorum yet.") Review Comment: this comment is too short. Maybe we should say "To maximize compatibility, the Docker image continues to use static voters, which are supported in 3.7 and later." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17622) Kafka Streams Timeout During Partition Rebalance
[ https://issues.apache.org/jira/browse/KAFKA-17622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929081#comment-17929081 ] Alieh Saeedi commented on KAFKA-17622: -- KIP-1094: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1094%3A+Add+a+new+constructor+method+with+nextOffsets+to+ConsumerRecords] (PR: [https://github.com/apache/kafka/pull/17414)] along with this fix: [https://github.com/apache/kafka/pull/17091] prevent the timeout issue in such scenarios. > Kafka Streams Timeout During Partition Rebalance > - > > Key: KAFKA-17622 > URL: https://issues.apache.org/jira/browse/KAFKA-17622 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > Fix For: 4.0.0 > > > Re: > [https://forum.confluent.io/t/kafka-streams-timeout-during-partition-rebalance-seeking-insights-on-notleaderorfollowerexception/11362] > Calling {{{}Consumer.position() from KS{}}}treams for computing the offset > that must be committed suffers from a race condition so that by the time we > want to commit, the position may be gone. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-18827: Initialize share group state persister impl [2/N]. [kafka]
smjn opened a new pull request, #18992: URL: https://github.com/apache/kafka/pull/18992 * In this PR, we have provided implementation for the initialize share group state RPC from the persister perspective. * Tests have been added wherever applicable. Note: Merge after https://github.com/apache/kafka/pull/18968 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] feat: add configurable max number of connectors [kafka]
jjaakola-aiven opened a new pull request, #18993: URL: https://github.com/apache/kafka/pull/18993 When Kafka Connect is offered as managed services there is need to restrict the number of allowed connectors to be run in the Connect cluster. Creating high number of connectors will make the Connect cluster unresponsive. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-17622) Kafka Streams Timeout During Partition Rebalance
[ https://issues.apache.org/jira/browse/KAFKA-17622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alieh Saeedi closed KAFKA-17622. > Kafka Streams Timeout During Partition Rebalance > - > > Key: KAFKA-17622 > URL: https://issues.apache.org/jira/browse/KAFKA-17622 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > Fix For: 4.0.0 > > > Re: > [https://forum.confluent.io/t/kafka-streams-timeout-during-partition-rebalance-seeking-insights-on-notleaderorfollowerexception/11362] > Calling {{{}Consumer.position() from KS{}}}treams for computing the offset > that must be committed suffers from a race condition so that by the time we > want to commit, the position may be gone. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dajac commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1965113944 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2519,19 +2519,28 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest] +var future = CompletableFuture.completedFuture[Unit](()) if (!isConsumerGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) } else if (!authHelper.authorize(request.context, READ, GROUP, consumerGroupHeartbeatRequest.data.groupId)) { requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) - CompletableFuture.completedFuture[Unit](()) +} else if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null && + !consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) { + // Check the authorization if the subscribed topic names are provided. + // Clients are not allowed to see topics that are not authorized for Describe. + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala)(identity) + if (authorizedTopics.size < consumerGroupHeartbeatRequest.data.subscribedTopicNames.size) { +requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED.exception)) Review Comment: I wonder whether we should put a custom error message. Have you considered it? ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -70,7 +71,8 @@ private[group] class GroupCoordinatorAdapter( override def consumerGroupHeartbeat( context: RequestContext, -request: ConsumerGroupHeartbeatRequestData +request: ConsumerGroupHeartbeatRequestData, +authorizer: Optional[Authorizer] Review Comment: This looks wrong to me. I would rather prefer to pass the `Authorizer` when the `GroupCoordinatorService` is constructed. Then, we can pass it to the `GroupMetadataManager`. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2519,19 +2519,28 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest] +var future = CompletableFuture.completedFuture[Unit](()) Review Comment: nit: I would prefer keeping the previous way in order to avoid this mutable variable. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2519,19 +2519,28 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest] +var future = CompletableFuture.completedFuture[Unit](()) if (!isConsumerGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) } else if (!authHelper.authorize(request.context, READ, GROUP, consumerGroupHeartbeatRequest.data.groupId)) { requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) - CompletableFuture.completedFuture[Unit](()) +} else if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null && + !consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) { + // Check the authorization if the subscribed topic names are provided. + // Clients are not allowed to see topics that are not authorized for Describe. + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala)(identity) + if (authorizedTopics.size < consumerGroupHeartbeatRequest.data.subscribedTopicNames.size) { +requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED.exception)) + } Review Comment: If we go in
Re: [PR] KAFKA-18733: Implemented fetch ratio and partition acquire time metrics (3/N) [kafka]
apoorvmittal10 commented on code in PR #18959: URL: https://github.com/apache/kafka/pull/18959#discussion_r1965556248 ## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ## @@ -62,9 +63,13 @@ public class DelayedShareFetch extends DelayedOperation { private final ReplicaManager replicaManager; private final BiConsumer exceptionHandler; private final PartitionMaxBytesStrategy partitionMaxBytesStrategy; +private final ShareGroupMetrics shareGroupMetrics; +private final Time time; // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; +// Tracks the start time to acquire any share partition for a fetch request. +private long acquireStartTimeMs; Review Comment: Certainly you are not wrong. But I think metrics need that wait time: here is the comment by Jun from discussion thread and my response: > 11. I was thinking that it would be useful to know if there is any tension when acquiring the share partition lock. This is not quite captured by FetchLockTimeMs. Perhaps we could add another metric that measures the amount of time that a request needs to wait before acquiring a lock. > AM Response: I have added TopicPartitionsAcquireTimeMs which can track the time spent by any share fetch request to acquire any share partition. It's hard to track the amount of time spent on waiting for a single share partition but we can track the time spent to acquire any. My impression is that we should be considering the wait time and any share partition acquire time itself to see how much time request is spending before successfully acquiring any lock and proceeding. If the metric purpose is not reflected in docs then let me know, I ll update them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18827: Initialize share state, share coordinator impl. [1/N] [kafka]
AndrewJSchofield commented on code in PR #18968: URL: https://github.com/apache/kafka/pull/18968#discussion_r1965580105 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java: ## @@ -317,16 +319,12 @@ public CoordinatorResult wr CoordinatorRecord record = generateShareStateRecord(partitionData, key); // build successful response if record is correctly created -WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData() -.setResults( -Collections.singletonList( - WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(), -Collections.singletonList( - WriteShareGroupStateResponse.toResponsePartitionResult( -key.partition() -)) -)) -); +WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData().setResults( Review Comment: I think the previous style of ``` new WriteShareGroupStateResponse() .setResults( ``` was more in line with the rest of the code you've written. ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ## @@ -808,6 +811,106 @@ public CompletableFuture deleteState(RequestC }); } +@Override +public CompletableFuture initializeState(RequestContext context, InitializeShareGroupStateRequestData request) { +// Send an empty response if the coordinator is not active. +if (!isActive.get()) { +return CompletableFuture.completedFuture( +generateErrorInitStateResponse( +request, +Errors.COORDINATOR_NOT_AVAILABLE, +"Share coordinator is not available." +) +); +} + +String groupId = request.groupId(); +// Send an empty response if groupId is invalid. +if (isGroupIdEmpty(groupId)) { +log.error("Group id must be specified and non-empty: {}", request); +return CompletableFuture.completedFuture( +new InitializeShareGroupStateResponseData() +); +} + +// Send an empty response if topic data is empty. +if (isEmpty(request.topics())) { +log.error("Topic Data is empty: {}", request); +return CompletableFuture.completedFuture( +new InitializeShareGroupStateResponseData() +); +} + +// A map to store the futures for each topicId and partition. +Map>> futureMap = new HashMap<>(); + +// The request received here could have multiple keys of structure group:topic:partition. However, +// the initializeState method in ShareCoordinatorShard expects a single key in the request. Hence, we will +// be looping over the keys below and constructing new InitializeShareGroupStateRequestData objects to pass +// onto the shard method. + +for (InitializeShareGroupStateRequestData.InitializeStateData topicData : request.topics()) { +Uuid topicId = topicData.topicId(); +for (InitializeShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) { +SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition()); + +InitializeShareGroupStateRequestData requestForCurrentPartition = new InitializeShareGroupStateRequestData() +.setGroupId(groupId) +.setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() +.setTopicId(topicId) +.setPartitions(List.of(partitionData; + +CompletableFuture initializeFuture = runtime.scheduleWriteOperation( +"initialize-share-group-state", +topicPartitionFor(coordinatorKey), +Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), +coordinator -> coordinator.initializeState(requestForCurrentPartition) +).exceptionally(deleteException -> Review Comment: nit: initializeException? ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ## @@ -808,6 +811,106 @@ public CompletableFuture deleteState(RequestC }); } +@Override +public CompletableFuture initializeState(RequestContext context, InitializeShareGroupStateRequestData request) { +// Send an empty response if the coordinator is not active. +if (!isActive.get()) { +return CompletableFuture.completedFuture( +generateErrorInitStateResponse( +
Re: [PR] KAFKA-18733: Implemented fetch ratio and partition acquire time metrics (3/N) [kafka]
apoorvmittal10 commented on code in PR #18959: URL: https://github.com/apache/kafka/pull/18959#discussion_r1965593013 ## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ## @@ -62,9 +63,13 @@ public class DelayedShareFetch extends DelayedOperation { private final ReplicaManager replicaManager; private final BiConsumer exceptionHandler; private final PartitionMaxBytesStrategy partitionMaxBytesStrategy; +private final ShareGroupMetrics shareGroupMetrics; +private final Time time; // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; +// Tracks the start time to acquire any share partition for a fetch request. +private long acquireStartTimeMs; Review Comment: Not sure what does that mean and what you want to capture in the metric. Can you please provide me an example? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18023: Enforcing Explicit Naming for Kafka Streams Internal Topics [kafka]
lucasbru commented on code in PR #18233: URL: https://github.com/apache/kafka/pull/18233#discussion_r1965533413 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ## @@ -2289,4 +2298,45 @@ public WrappedProcessorSupplier wra processorWrapper.wrapProcessorSupplier(name, processorSupplier) ); } + +public void addImplicitInternalNames(final InternalResourcesNaming internalResourcesNaming) { +implicitInternalNames.add(internalResourcesNaming); +} + +public void checkUnprovidedNames() { +if (!implicitInternalNames.isEmpty()) { +final StringBuilder result = new StringBuilder(); +final List changelogTopics = new ArrayList<>(); +final List stateStores = new ArrayList<>(); +final List repartitionTopics = new ArrayList<>(); +for (final InternalResourcesNaming internalResourcesNaming : implicitInternalNames) { +if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) { + changelogTopics.add(internalResourcesNaming.changelogTopic()); +} +if (!Utils.isBlank(internalResourcesNaming.stateStore())) { +stateStores.add(internalResourcesNaming.stateStore()); +} +if (!Utils.isBlank(internalResourcesNaming.repartitionTopic())) { + repartitionTopics.add(internalResourcesNaming.repartitionTopic()); +} +} +if (!changelogTopics.isEmpty()) { +result.append(String.format("Following changelog topic(s) has not been named: %s%n", String.join(", ", changelogTopics))); +} +if (!stateStores.isEmpty()) { +result.append(String.format("Following state store(s) has not been named: %s%n", String.join(", ", stateStores))); +} +if (!repartitionTopics.isEmpty()) { +result.append(String.format("Following repartition topic(s) has not been named: %s%n", String.join(", ", repartitionTopics))); +} +if (ensureExplicitInternalResourceNaming) { +throw new TopologyException(result.toString()); +} else { +log.warn("Enforce explicit naming for all internal resources is set to false. If you want" + Review Comment: Sounds good, trying to shorten it a bit (just an idea): ``` Explicit naming for internal resources is currently disabled. If you want to enforce user-defined names for all internal resources, set "ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG" to true. Note: Changing internal resource names may require a full streams application reset for an already deployed application. Consult the documentation on naming operators for more details. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18733: Implemented fetch ratio and partition acquire time metrics (3/N) [kafka]
adixitconfluent commented on code in PR #18959: URL: https://github.com/apache/kafka/pull/18959#discussion_r1965590620 ## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ## @@ -62,9 +63,13 @@ public class DelayedShareFetch extends DelayedOperation { private final ReplicaManager replicaManager; private final BiConsumer exceptionHandler; private final PartitionMaxBytesStrategy partitionMaxBytesStrategy; +private final ShareGroupMetrics shareGroupMetrics; +private final Time time; // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; +// Tracks the start time to acquire any share partition for a fetch request. +private long acquireStartTimeMs; Review Comment: Hmm, maybe we can do this - Keep a global variable to track the amount of time wasted when the topic partition was tried to be acquired and we were unsuccessful in acquiring it. Adding this change along with the above change in my comment should account for that extra time when acquire was unsuccessful and we would not be adding the time when the request stays idle. Of course, once we successfully acquire some topic partitions and record the metric, we set that global variable back to 0. Wdyt @apoorvmittal10 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18733: Implemented fetch ratio and partition acquire time metrics (3/N) [kafka]
adixitconfluent commented on code in PR #18959: URL: https://github.com/apache/kafka/pull/18959#discussion_r1965590620 ## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ## @@ -62,9 +63,13 @@ public class DelayedShareFetch extends DelayedOperation { private final ReplicaManager replicaManager; private final BiConsumer exceptionHandler; private final PartitionMaxBytesStrategy partitionMaxBytesStrategy; +private final ShareGroupMetrics shareGroupMetrics; +private final Time time; // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; +// Tracks the start time to acquire any share partition for a fetch request. +private long acquireStartTimeMs; Review Comment: Hmm, maybe we can do this - Keep a global variable to track the amount of time wasted when the topic partition was tried to be acquired and we were unsuccessful in acquiring it. Adding this change along with the above change should account for that extra time when acquire was unsuccessful and we would not be adding the time when the request stays idle. Of course, once we successfully acquire some topic partitions and record the metric, we set that global variable back to 0. Wdyt @apoorvmittal10 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-18813: [3/N] Client support for TopicAuthException in DescribeConsumerGroup path [kafka]
lianetm opened a new pull request, #18996: URL: https://github.com/apache/kafka/pull/18996 Support TopicAuthorizationException received in DescribeConsumerGroup response. This is expected to be the case when the admin client does not have access to topics in the group and gets a generic auth error, no topic names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18629: ShareGroupDeleteState admin client impl. [kafka]
smjn commented on code in PR #18928: URL: https://github.com/apache/kafka/pull/18928#discussion_r1965654383 ## tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java: ## @@ -208,6 +222,67 @@ public void describeGroups() throws ExecutionException, InterruptedException { } } +Map deleteShareGroups() { +List shareGroupIds = listDetailedShareGroups(); +List groupIds = opts.options.has(opts.allGroupsOpt) +? shareGroupIds.stream().map(GroupListing::groupId).toList() +: opts.options.valuesOf(opts.groupOpt); + +if (groupIds.isEmpty()) { +throw new IllegalArgumentException("--groups or --all-groups argument is mandatory"); Review Comment: @AndrewJSchofield It is already happening correctly. The util method `getShareGroupService` was not performing the validation check and hence I missed it and added it at the wrong place. I will rectify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update upgrade notes for 4.0.0 [kafka]
ijuma commented on PR #18960: URL: https://github.com/apache/kafka/pull/18960#issuecomment-2674814027 @chia7712 Can you be a bit more specific regarding what you mean by zk migration docs? Some of them were already removed from 4.0/trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Move the ELR default version to 4.1 [kafka]
dajac commented on PR #18954: URL: https://github.com/apache/kafka/pull/18954#issuecomment-2674815425 Merged to trunk and to 4.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update upgrade notes for 4.0.0 [kafka]
chia7712 commented on PR #18960: URL: https://github.com/apache/kafka/pull/18960#issuecomment-2674832807 @ijuma Sorry that I traced the wrong branch. #17813 had removed them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17125 Add integration test for StreamsGroup in Admin API [kafka]
lucasbru merged PR #18911: URL: https://github.com/apache/kafka/pull/18911 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Move the ELR default version to 4.1 [kafka]
dajac merged PR #18954: URL: https://github.com/apache/kafka/pull/18954 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-18844) Fix flaky QuorumControllerTest.testBalancePartitionLeaders()
[ https://issues.apache.org/jira/browse/KAFKA-18844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PoAn Yang reassigned KAFKA-18844: - Assignee: PoAn Yang > Fix flaky QuorumControllerTest.testBalancePartitionLeaders() > > > Key: KAFKA-18844 > URL: https://issues.apache.org/jira/browse/KAFKA-18844 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: PoAn Yang >Priority: Major > > This test is flaky since a few days ago. See > [https://develocity.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FZurich&tests.container=org.apache.kafka.controller.QuorumControllerTest&tests.test=testBalancePartitionLeaders()|https://develocity.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FZurich&tests.container=org.apache.kafka.controller.QuorumControllerTest&tests.test=testBalancePartitionLeaders().]. > We need to investigate it and to fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18629: ShareGroupDeleteState admin client impl. [kafka]
smjn commented on PR #18928: URL: https://github.com/apache/kafka/pull/18928#issuecomment-2674866046 @AndrewJSchofield Thanks for the review have incorporated comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18843) MirrorMaker2 workerId is not unique, but use the same for all the workers
[ https://issues.apache.org/jira/browse/KAFKA-18843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18843: Fix Version/s: (was: 4.0.0) > MirrorMaker2 workerId is not unique, but use the same for all the workers > - > > Key: KAFKA-18843 > URL: https://issues.apache.org/jira/browse/KAFKA-18843 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 4.0.0, 3.7.0, 3.8.0, 3.9.0 >Reporter: Bertalan Kondrat >Assignee: Bertalan Kondrat >Priority: Major > > MirrorMaker2 is using the `sourceAndTarget.toString()` as a workerId, that > makes all the worker's IDs the same across the distributed herder. > [https://github.com/apache/kafka/blob/d31cbf59dee6a77c1c673db06900ecc238894ed7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L243] > > This not cause any functional issues based on our experience, but renders the > workerId in the status topic messages and in the rest responses useless or > misleading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18844: Fix flaky QuorumControllerTest.testBalancePartitionLeaders() [kafka]
ijuma commented on PR #18997: URL: https://github.com/apache/kafka/pull/18997#issuecomment-2674865005 This is related to #18845, thanks for the quick fix! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18844: Fix flaky QuorumControllerTest.testBalancePartitionLeaders() [kafka]
ijuma commented on PR #18997: URL: https://github.com/apache/kafka/pull/18997#issuecomment-2674872246 > in QuorumController [0], we don't set bootstrapMetadata for it, so it use Metadata.latestProduction() We tried to remove these defaults as part of #18845 - looks like I missed this one. Can you try removing the default? Is that an easy change or does it cause a lot of issues? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18507) Remove ControllerChannelManager
[ https://issues.apache.org/jira/browse/KAFKA-18507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18507: Fix Version/s: (was: 4.0.0) > Remove ControllerChannelManager > --- > > Key: KAFKA-18507 > URL: https://issues.apache.org/jira/browse/KAFKA-18507 > Project: Kafka > Issue Type: Bug >Reporter: TengYao Chi >Assignee: TengYao Chi >Priority: Major > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerChannelManager.scala -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18527) SaslClientsWithInvalidCredentialsTest.java and SaslClientsWithInvalidCredentialsTest.scala should run for async consumer
[ https://issues.apache.org/jira/browse/KAFKA-18527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18527: Fix Version/s: (was: 4.0.0) > SaslClientsWithInvalidCredentialsTest.java and > SaslClientsWithInvalidCredentialsTest.scala should run for async consumer > > > Key: KAFKA-18527 > URL: https://issues.apache.org/jira/browse/KAFKA-18527 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: 黃竣陽 >Assignee: TengYao Chi >Priority: Blocker > Labels: kip-848-client-support > > All tests in the file are parametrized to run for Classic only, so they won't > run with the new async consumer. We should enabled them so that they run both > both consumers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18526) CustomQuotaCallbackTest should run for async consumer
[ https://issues.apache.org/jira/browse/KAFKA-18526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18526: Fix Version/s: (was: 4.0.0) > CustomQuotaCallbackTest should run for async consumer > - > > Key: KAFKA-18526 > URL: https://issues.apache.org/jira/browse/KAFKA-18526 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: 黃竣陽 >Assignee: 黃竣陽 >Priority: Blocker > Labels: kip-848-client-support > > All tests in the file are parametrized to run for Classic only, so they won't > run with the new async consumer. We should enabled them so that they run both > both consumers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18554) Cleanup the doc of SocketServerConfigs#ADVERTISED_LISTENERS_CONFIG
[ https://issues.apache.org/jira/browse/KAFKA-18554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18554: Fix Version/s: (was: 4.0.0) > Cleanup the doc of SocketServerConfigs#ADVERTISED_LISTENERS_CONFIG > -- > > Key: KAFKA-18554 > URL: https://issues.apache.org/jira/browse/KAFKA-18554 > Project: Kafka > Issue Type: Sub-task >Reporter: TengYao Chi >Assignee: TengYao Chi >Priority: Major > > The doc of `advertised.listeners` still contains the statement of zk, we > should remove it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18548) Cleanup KRaftConfig documentation
[ https://issues.apache.org/jira/browse/KAFKA-18548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18548: Fix Version/s: (was: 4.0.0) > Cleanup KRaftConfig documentation > - > > Key: KAFKA-18548 > URL: https://issues.apache.org/jira/browse/KAFKA-18548 > Project: Kafka > Issue Type: Sub-task >Reporter: TengYao Chi >Assignee: TengYao Chi >Priority: Major > > We don't need zk related description anymore since we are removing zk -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18643) Set terminal version for ApiVersionsResponse#ZkMigrationReady
[ https://issues.apache.org/jira/browse/KAFKA-18643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18643: Fix Version/s: (was: 4.0.0) > Set terminal version for ApiVersionsResponse#ZkMigrationReady > - > > Key: KAFKA-18643 > URL: https://issues.apache.org/jira/browse/KAFKA-18643 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Priority: Minor > > This field (ZkMigrationReady in ApiVersionsResponse) was last used in 3.6. > Starting in 3.7, we used the controller registration RPC in lieu of this > field. > Since 4.0 will never write this field, we can remove code usages and set the > current ApiVersionsResponse version as the terminal field version for this > field. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12825) Remove Deprecated method StreamsBuilder#addGlobalStore
[ https://issues.apache.org/jira/browse/KAFKA-12825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-12825: Fix Version/s: (was: 4.0.0) > Remove Deprecated method StreamsBuilder#addGlobalStore > -- > > Key: KAFKA-12825 > URL: https://issues.apache.org/jira/browse/KAFKA-12825 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Josep Prat >Priority: Blocker > > Methods: > org.apache.kafka.streams.scala.StreamsBuilder#addGlobalStore > org.apache.kafka.streams.StreamsBuilder#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder, > java.lang.String, org.apache.kafka.streams.kstream.Consumed, > org.apache.kafka.streams.processor.ProcessorSupplier) > were deprecated in 2.7 > > See KAFKA-10379 and KIP-478 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12831) Remove Deprecated method StateStore#init
[ https://issues.apache.org/jira/browse/KAFKA-12831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-12831: Fix Version/s: (was: 4.0.0) > Remove Deprecated method StateStore#init > > > Key: KAFKA-12831 > URL: https://issues.apache.org/jira/browse/KAFKA-12831 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Josep Prat >Priority: Blocker > > The method > org.apache.kafka.streams.processor.StateStore#init(org.apache.kafka.streams.processor.ProcessorContext, > org.apache.kafka.streams.processor.StateStore) was deprected in version 2.7 > > See KAFKA-10562 and KIP-478 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-18844: Fix flaky QuorumControllerTest.testBalancePartitionLeaders() [kafka]
FrankYang0529 opened a new pull request, #18997: URL: https://github.com/apache/kafka/pull/18997 When we initialize `FeatureControlManager` in `QuorumController` [0], we don't set `bootstrapMetadata` for it, so it use `Metadata.latestProduction()` [1]. In the test case, we use `IBP_3_7_IV0` as `bootstrapMetadata` [2]. When initializing `QuorumController`, it takes time to replay record to overwrite `FeatureControlManager#metadataVersion`. If we add `Thread.sleep(1000L)` before `metadataVersion.set(mv)` [3], we can reproduce the flaky test steadily. I'm not sure whether setting `bootstrapMetadata` to `FeatureControlManager` is correct, so I fix the case by waiting `metadataVersion` is overwrote. [0] https://github.com/apache/kafka/blob/8f13e7c20737400a2b7b93449c20146089df00a6/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java#L1536-L1541 [1] https://github.com/apache/kafka/blob/8f13e7c20737400a2b7b93449c20146089df00a6/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java#L57 [2] https://github.com/apache/kafka/blob/8f13e7c20737400a2b7b93449c20146089df00a6/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java#L177-L178 [3] https://github.com/apache/kafka/blob/8f13e7c20737400a2b7b93449c20146089df00a6/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java#L371-L379 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Move the ELR default version to 4.1 [kafka]
ijuma commented on PR #18954: URL: https://github.com/apache/kafka/pull/18954#issuecomment-2674861637 Interesting, the flaky test looks related to #18845, I added a comment to the JIRA ticket too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13027) Support for Jakarta EE 9.x to allow applications to migrate
[ https://issues.apache.org/jira/browse/KAFKA-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-13027: Fix Version/s: (was: 4.0.0) > Support for Jakarta EE 9.x to allow applications to migrate > --- > > Key: KAFKA-13027 > URL: https://issues.apache.org/jira/browse/KAFKA-13027 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Frode Carlsen >Priority: Major > > Some of the kafka libraries (such as connect-api) have direct dependencies on > older Java EE 8 specifications (e.g. javax.ws.rs:javax.ws.rs-api:2.1.1). > This creates issues in environments upgrading to Jakarta 9.0 and beyond (9.1 > requires minimum Java 11). For example upgrading web application servers > such as migrating to Jetty 11. > The main thing preventing backwards compatibility is that the package > namespace has moved from "*javax.**" to "*jakarta.**", along with a few > namespace changes in XML configuration files. (new specifications are > published here [https://jakarta.ee/specifications/,] along with references to > official artifacts and compliant implementations). > From KAFKA-12894 (KIP-705) it appears dropping support for java 8 won't > happen till Q4 2022, which makes it harder to migrate to Jakarta 9.1, but 9.0 > is still Java 8 compatible. > Therefore, to allow projects that use Kafka client libraries to migrate prior > to the full work being completed in a future Kafka version, would it be > possible to generate Jakarta 9 compatible artifacts and dual publish these > for libraries that now depend on javax.ws.rs / javax.servlet and similar? > This is done by a number of open source libraries, as an alternative to > having different release branches for the time being. Other than the > namespace change in 9.0 and minimum java LTS version in 9.1, the apis are > fully compatible with Java EE 8. > As a suggestion, this fairly easy to do automaticallly using the > [https://github.com/eclipse/transformer/] for migration (most projects end up > publishing under artifacts with a either "-jakarta" as a suffix on the > artifactId or classifier) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-12843: Fix Version/s: (was: 4.0.0) > KIP-740 follow up: clean up TaskMetadata > > > Key: KAFKA-12843 > URL: https://issues.apache.org/jira/browse/KAFKA-12843 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: João Pedro Fonseca >Priority: Blocker > > See > [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > – for the TaskMetadata class, we need to: > # Deprecate the TaskMetadata#getTaskId method > # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() > API that returns a TaskId instead of a String > # Remove the deprecated constructor -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15284) Implement GroupProtocolResolver to dynamically determine consumer group protocol
[ https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-15284: Fix Version/s: (was: 4.0.0) > Implement GroupProtocolResolver to dynamically determine consumer group > protocol > > > Key: KAFKA-15284 > URL: https://issues.apache.org/jira/browse/KAFKA-15284 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: kip-848-client-support > > At client initialization, we need to determine which of the > {{ConsumerDelegate}} implementations to use: > # {{LegacyKafkaConsumerDelegate}} > # {{AsyncKafkaConsumerDelegate}} > There are conditions defined by KIP-848 that determine client eligibility to > use the new protocol. This will be modeled by the > {{{}GroupProtocolResolver{}}}. > Known tasks: > * Determine at what point in the {{Consumer}} initialization the network > communication should happen > * Determine what RPCs to invoke in order to determine eligibility (API > versions, IBP version, etc.) > * Implement the network client lifecycle (startup, communication, shutdown, > etc.) > * Determine the fall-back path in case the client is not eligible to use the > protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12833) Remove Deprecated methods under TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-12833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-12833: Fix Version/s: (was: 4.0.0) > Remove Deprecated methods under TopologyTestDriver > -- > > Key: KAFKA-12833 > URL: https://issues.apache.org/jira/browse/KAFKA-12833 > Project: Kafka > Issue Type: Sub-task > Components: streams-test-utils >Reporter: Josep Prat >Assignee: Arnav Dadarya >Priority: Blocker > > The following methods were at least deprecated in 2.8 > * > org.apache.kafka.streams.TopologyTestDriver.KeyValueStoreFacade#init(org.apache.kafka.streams.processor.ProcessorContext, > org.apache.kafka.streams.processor.StateStore) > * > org.apache.kafka.streams.TopologyTestDriver.WindowStoreFacade#init(org.apache.kafka.streams.processor.ProcessorContext, > org.apache.kafka.streams.processor.StateStore) > > *Disclaimer,* these methods might have been deprecated for a longer time, but > they were definitely moved to this new "hierarchy position" in version 2.8 > > Move from standalone class to inner class was done under KAFKA-12435 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14560) Remove old client protocol API versions in Kafka 4.0 (KIP-896)
[ https://issues.apache.org/jira/browse/KAFKA-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-14560: Fix Version/s: 4.1.0 (was: 4.0.0) > Remove old client protocol API versions in Kafka 4.0 (KIP-896) > -- > > Key: KAFKA-14560 > URL: https://issues.apache.org/jira/browse/KAFKA-14560 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Blocker > Fix For: 4.1.0 > > > Please see KIP for details: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-896%3A+Remove+old+client+protocol+API+versions+in+Kafka+4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16442) Update streams_standby_replica_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-16442: Fix Version/s: (was: 4.0.0) > Update streams_standby_replica_test.py to support KIP-848’s group protocol > config > - > > Key: KAFKA-16442 > URL: https://issues.apache.org/jira/browse/KAFKA-16442 > Project: Kafka > Issue Type: Test > Components: clients, consumer, streams, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > > This task is to update the test method(s) in > {{streams_standby_replica_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17775) Remove Java#IS_JAVA9_COMPATIBLE property
[ https://issues.apache.org/jira/browse/KAFKA-17775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-17775: Fix Version/s: (was: 4.0.0) > Remove Java#IS_JAVA9_COMPATIBLE property > > > Key: KAFKA-17775 > URL: https://issues.apache.org/jira/browse/KAFKA-17775 > Project: Kafka > Issue Type: Sub-task >Reporter: 黃竣陽 >Assignee: 黃竣陽 >Priority: Major > > Because the Java 8 is deprecate in Kafka, so we didn't need this check -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14560) Remove old client protocol API versions in Kafka 4.0 (KIP-896)
[ https://issues.apache.org/jira/browse/KAFKA-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929180#comment-17929180 ] David Jacot commented on KAFKA-14560: - Temporarily moving it to 4.1 so I can cut an RC for 4.0. > Remove old client protocol API versions in Kafka 4.0 (KIP-896) > -- > > Key: KAFKA-14560 > URL: https://issues.apache.org/jira/browse/KAFKA-14560 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Blocker > Fix For: 4.1.0 > > > Please see KIP for details: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-896%3A+Remove+old+client+protocol+API+versions+in+Kafka+4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17307) Remove junit-platform.properties from test JARs
[ https://issues.apache.org/jira/browse/KAFKA-17307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-17307: Fix Version/s: (was: 4.0.0) > Remove junit-platform.properties from test JARs > --- > > Key: KAFKA-17307 > URL: https://issues.apache.org/jira/browse/KAFKA-17307 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.1 >Reporter: Cosimo Damiano Prete >Priority: Critical > > Hello. > When using Kafka in tests, some {{test(-test)}} JARs are pulled which contain > a {{junit-platform.properties}} that clashes with the one I've already on the > classpath. > I've built a minimal reproducible example in > [https://github.com/cdprete/kafka-junit-platform-props]. > Could it be possible to remove these {{test(-test)}} JARs or, at least, > the\{{ junit-platform.properties}} files? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned
[ https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-16623: Fix Version/s: (was: 4.0.0) > KafkaAsyncConsumer system tests warn about revoking partitions that weren't > previously assigned > --- > > Key: KAFKA-16623 > URL: https://issues.apache.org/jira/browse/KAFKA-16623 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > > When running system tests for the KafkaAsyncConsumer, we occasionally see > this warning: > {noformat} > File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner > self.run() > File "/usr/lib/python3.7/threading.py", line 865, in run > self._target(*self._args, **self._kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", > line 38, in _protected_worker > self._worker(idx, node) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 304, in _worker > handler.handle_partitions_revoked(event, node, self.logger) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 163, in handle_partitions_revoked > (tp, node.account.hostname) > AssertionError: Topic partition TopicPartition(topic='test_topic', > partition=0) cannot be revoked from worker20 as it was not previously > assigned to that consumer > {noformat} > In test_fencing_static_consumer, there are two sets of consumers that use > group instance IDs: the initial set and the "conflict" set. It appears that > one of the "conflicting" consumers hijacks the partition ownership from the > coordinator's perspective when the initial consumer leaves the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14517) Implement regex subscriptions
[ https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-14517: Fix Version/s: (was: 4.0.0) > Implement regex subscriptions > - > > Key: KAFKA-14517 > URL: https://issues.apache.org/jira/browse/KAFKA-14517 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-preview > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17183) New consumer system tests pass for subset of tests, but fail if running all tests
[ https://issues.apache.org/jira/browse/KAFKA-17183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-17183: Fix Version/s: (was: 4.0.0) > New consumer system tests pass for subset of tests, but fail if running all > tests > - > > Key: KAFKA-17183 > URL: https://issues.apache.org/jira/browse/KAFKA-17183 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, system-tests > > In 3.8, many of the system tests were updated to exercise both the old > consumer and the new consumer. For quicker feedback, I have been creating a > YAML-based test suite for just the subset of tests that were updated. When > running the system tests in the subset, I am consistently seeing 100% pass > rate. Recently I started running the entire test suite, and are now seeing > many failures. > The cause is as yet unknown, but adding this here as a task to fix whatever > is causing this behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16441) Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-16441: Fix Version/s: (was: 4.0.0) > Update streams_broker_down_resilience_test.py to support KIP-848’s group > protocol config > > > Key: KAFKA-16441 > URL: https://issues.apache.org/jira/browse/KAFKA-16441 > Project: Kafka > Issue Type: Test > Components: clients, consumer, streams, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > > This task is to update the test method(s) in > {{streams_broker_down_resilience_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18309) Add ConsumerGroupRegularExpression record to dump-log tool
[ https://issues.apache.org/jira/browse/KAFKA-18309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18309: Fix Version/s: (was: 4.0.0) > Add ConsumerGroupRegularExpression record to dump-log tool > -- > > Key: KAFKA-18309 > URL: https://issues.apache.org/jira/browse/KAFKA-18309 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17892) Update README after migration to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-17892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-17892: Fix Version/s: (was: 4.0.0) > Update README after migration to log4j2 > --- > > Key: KAFKA-17892 > URL: https://issues.apache.org/jira/browse/KAFKA-17892 > Project: Kafka > Issue Type: Improvement >Reporter: TengYao Chi >Assignee: TengYao Chi >Priority: Minor > > see discussion: > https://github.com/apache/kafka/pull/17373#discussion_r1813405127 > Since the current example of log4j in the README relies on a file in the > trunk branch, we should update the README after migrating to log4j2. > https://github.com/apache/kafka/blob/984777f0b952b6e1629d3b16ce1f196d54278c3e/README.md?plain=1#L57 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17681) Fix unstable consumer_test.py#test_fencing_static_consumer
[ https://issues.apache.org/jira/browse/KAFKA-17681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-17681: Fix Version/s: (was: 4.0.0) > Fix unstable consumer_test.py#test_fencing_static_consumer > -- > > Key: KAFKA-17681 > URL: https://issues.apache.org/jira/browse/KAFKA-17681 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Labels: flaky-test, kip-848-client-support > > {code:java} > AssertionError('Static consumers attempt to join with instance id in use > should not cause a rebalance.') > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 359, in test_fencing_static_consumer > assert num_rebalances == consumer.num_rebalances(), "Static consumers > attempt to join with instance id in use should not cause a rebalance. before: > " + str(num_rebalances) + " after: " + str(consumer.num_rebalances()) > AssertionError: Static consumers attempt to join with instance id in use > should not cause a rebalance. > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18733: Implemented fetch ratio and partition acquire time metrics (3/N) [kafka]
adixitconfluent commented on code in PR #18959: URL: https://github.com/apache/kafka/pull/18959#discussion_r1965707920 ## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ## @@ -62,9 +63,13 @@ public class DelayedShareFetch extends DelayedOperation { private final ReplicaManager replicaManager; private final BiConsumer exceptionHandler; private final PartitionMaxBytesStrategy partitionMaxBytesStrategy; +private final ShareGroupMetrics shareGroupMetrics; +private final Time time; // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; +// Tracks the start time to acquire any share partition for a fetch request. +private long acquireStartTimeMs; Review Comment: so basically the global variable would be the addition of time spent to make a call to `acquirablePartitions` from tryComplete when we are unable to acquire any partitions and then using it once we are finally able to acquire any partitions. That should give us the total time spent in contention for topic partitions. Having said that, these times would be minimal (since they are just function calls) and won't give a high enough time for us to judge contention. So, I think your current approach should work correctly in giving the time for contention for topic partitions. However, if tryComplete is not called too often (due to a bigger problem in the system), then we won't have contention time. We would be getting the wait time of requests in that scenario. But, I agree that scenario is extremely rare. Maybe, we can change the doc in the KIP for this metric to better reflect the meaning in exceptional scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17780) Heartbeat interval is not configured in the heartbeatrequest manager
[ https://issues.apache.org/jira/browse/KAFKA-17780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-17780: Fix Version/s: (was: 4.0.0) > Heartbeat interval is not configured in the heartbeatrequest manager > > > Key: KAFKA-17780 > URL: https://issues.apache.org/jira/browse/KAFKA-17780 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, kip >Reporter: Arpit Goyal >Priority: Critical > Labels: kip-848-client-support > > In the AbstractHeartBeatRequestManager , I observed we are not setting the > right heartbeat request interval. Is this intentional ? > [~lianetm] [~kirktrue] > {code:java} > long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); > long retryBackoffMaxMs = > config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); > this.heartbeatRequestState = new HeartbeatRequestState(logContext, > time, 0, retryBackoffMs, > retryBackoffMaxMs, maxPollIntervalMs); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18481) Remove ZookeeperClient, KeeperException, Code
[ https://issues.apache.org/jira/browse/KAFKA-18481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18481: Fix Version/s: (was: 4.0.0) > Remove ZookeeperClient, KeeperException, Code > - > > Key: KAFKA-18481 > URL: https://issues.apache.org/jira/browse/KAFKA-18481 > Project: Kafka > Issue Type: Improvement >Reporter: TengYao Chi >Assignee: TengYao Chi >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18827: Initialize share state, share coordinator impl. [1/N] [kafka]
smjn commented on code in PR #18968: URL: https://github.com/apache/kafka/pull/18968#discussion_r1965706156 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java: ## @@ -317,16 +319,12 @@ public CoordinatorResult wr CoordinatorRecord record = generateShareStateRecord(partitionData, key); // build successful response if record is correctly created -WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData() -.setResults( -Collections.singletonList( - WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(), -Collections.singletonList( - WriteShareGroupStateResponse.toResponsePartitionResult( -key.partition() -)) -)) -); +WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData().setResults( Review Comment: Will need to refactor all the code - this is the acceptable style since previous one causes alignment issues. I have a JIRA for the same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18204) Upgrade to rocksdb 8.x+ (ideally 9.x)
[ https://issues.apache.org/jira/browse/KAFKA-18204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18204: Fix Version/s: (was: 4.0.0) > Upgrade to rocksdb 8.x+ (ideally 9.x) > - > > Key: KAFKA-18204 > URL: https://issues.apache.org/jira/browse/KAFKA-18204 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Radha Krishna Peteti >Assignee: Suresh Kumar >Priority: Major > > Kafka still uses rocksdbjni version 7.x (ref: > [https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L120]) > which is no longer receiving backports from upstream. > Please update to rocksdb version 9.x (latest version) so that security > updates are received. > Examples for critical vulnerabilities (CVE score 9.8) in rocksdb version 7.x: > [https://nvd.nist.gov/vuln/detail/CVE-2023-45853] > [https://nvd.nist.gov/vuln/detail/CVE-2022-37434] > (updating to the tip of 8.x release fixes these two vulnerabilities but for > any new security fixes, we will need to move to 9.x) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18827: Initialize share state, share coordinator impl. [1/N] [kafka]
smjn commented on code in PR #18968: URL: https://github.com/apache/kafka/pull/18968#discussion_r1965706156 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java: ## @@ -317,16 +319,12 @@ public CoordinatorResult wr CoordinatorRecord record = generateShareStateRecord(partitionData, key); // build successful response if record is correctly created -WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData() -.setResults( -Collections.singletonList( - WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(), -Collections.singletonList( - WriteShareGroupStateResponse.toResponsePartitionResult( -key.partition() -)) -)) -); +WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData().setResults( Review Comment: Will need to refactor all the code - this is the acceptable style since previous one causes alignment issues. I have a JIRA for the same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18176) Consumer system tests failure due to unsupported protocol configs
[ https://issues.apache.org/jira/browse/KAFKA-18176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18176: Fix Version/s: (was: 4.0.0) > Consumer system tests failure due to unsupported protocol configs > - > > Key: KAFKA-18176 > URL: https://issues.apache.org/jira/browse/KAFKA-18176 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > Consumer system tests are failing with the validation that Classic protocol > configs cannot be used with the new consumer/protocol (validation added with > https://issues.apache.org/jira/browse/KAFKA-17338) > > OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=False.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer/1/VerifiableConsumer-0-140207405726800/ducker05/verifiable_consumer.stdout:Exception > in thread "main" org.apache.kafka.common.config.ConfigException: > session.timeout.ms cannot be set when group.protocol=CONSUMER -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16891) KIP-1043: Administration of groups
[ https://issues.apache.org/jira/browse/KAFKA-16891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929182#comment-17929182 ] David Jacot commented on KAFKA-16891: - Removing 4.0 from the fix versions because we cannot release with open jiras associated to the version that we release. > KIP-1043: Administration of groups > -- > > Key: KAFKA-16891 > URL: https://issues.apache.org/jira/browse/KAFKA-16891 > Project: Kafka > Issue Type: New Feature >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Fix For: 4.1.0 > > > This issue tracks the development of KIP-1043: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1043%3A+Administration+of+groups. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16092) Queues for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-16092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-16092: Fix Version/s: (was: 4.0.0) (was: 3.8.0) (was: 3.9.0) > Queues for Kafka > > > Key: KAFKA-16092 > URL: https://issues.apache.org/jira/browse/KAFKA-16092 > Project: Kafka > Issue Type: New Feature >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: queues-for-kafka > Fix For: 4.1.0 > > Attachments: image-2024-04-28-11-05-56-153.png > > > This Jira tracks the development of KIP-932: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Update release script for 4.0 [kafka]
dajac opened a new pull request, #18999: URL: https://github.com/apache/kafka/pull/18999 This patch updates the release script to use JDK 21 to build the release. We could also use JDK 17 but using JDK 21 directly does not change much. We have to verify anyway that the server works with 17 and the client with 11. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18748 Run new tests separately in PRs [kafka]
mumrah commented on PR #18770: URL: https://github.com/apache/kafka/pull/18770#issuecomment-2675376070 @chia7712 thanks for the review! > What's the difference between new and auto-quarantined? I'm a bit confused. These are the same concept. I have aligned on the term "new" since it is easier to type 😄 I also renamed AutoQuarantinedTestFilter to CatalogTestFilter and QuarantinedPostDiscoveryFilter to KafkaPostDiscoveryFilter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] KIP-891: Connect Multiversion Support (Versioned Connector Creation and related changes) [kafka]
snehashisp commented on code in PR #17743: URL: https://github.com/apache/kafka/pull/17743#discussion_r1966088509 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1788,19 +1817,19 @@ public WorkerTask build() { Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); -final Class connectorClass = plugins.connectorClass( - connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + +final Connector connector = instantiateConnector(connectorConfig.originalsStrings()); Review Comment: I use the connector instance in the metrics PR to get the version of the connector in [TaskPluginsMetadata](https://github.com/apache/kafka/blob/395a7223d99a508cf0d993b876abb105a1639bb1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java). I think instantiating a connector here to get the version is okay here but we can come back to this once we review the metrics PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18023: Enforcing Explicit Naming for Kafka Streams Internal Topics [kafka]
lucasbru commented on PR #18233: URL: https://github.com/apache/kafka/pull/18233#issuecomment-2675414685 LGTM Could you check why there is a test failing? FAILED ❌ StreamsConfigTest > shouldNotLeakInternalDocMembers() looks related -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18573) Add support for OAuth jwt-bearer grant type
[ https://issues.apache.org/jira/browse/KAFKA-18573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-18573: -- Labels: OAuth2 needs-kip (was: needs-kip) > Add support for OAuth jwt-bearer grant type > --- > > Key: KAFKA-18573 > URL: https://issues.apache.org/jira/browse/KAFKA-18573 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 4.0.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: OAuth2, needs-kip > Fix For: 4.1.0 > > > The current OAuth implementation allows for a narrow selection of use cases. > Users have asked for the ability to have more out-of-the box options > regarding configuration and exposing of some of the internal building blocks > for better reuse. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-18847) Refactor OAuth layer to improve reusability
Kirk True created KAFKA-18847: - Summary: Refactor OAuth layer to improve reusability Key: KAFKA-18847 URL: https://issues.apache.org/jira/browse/KAFKA-18847 Project: Kafka Issue Type: Improvement Components: clients, security Affects Versions: 4.0.0 Reporter: Kirk True Assignee: Kirk True Fix For: 4.1.0 The current OAuth implementation allows for a narrow selection of use cases. Users have asked for the ability to have more out-of-the box options regarding configuration and exposing of some of the internal building blocks for better reuse. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18847) Refactor OAuth layer to improve reusability
[ https://issues.apache.org/jira/browse/KAFKA-18847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-18847: -- Description: The current OAuth implementation allows for a narrow selection of use cases and the code is structured accordingly. With KAFKA-18573, we want to extend support for OAuth so some internal refactoring needs to be performed. (was: The current OAuth implementation allows for a narrow selection of use cases. Users have asked for the ability to have more out-of-the box options regarding configuration and exposing of some of the internal building blocks for better reuse.) > Refactor OAuth layer to improve reusability > --- > > Key: KAFKA-18847 > URL: https://issues.apache.org/jira/browse/KAFKA-18847 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 4.0.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: OAuth2 > Fix For: 4.1.0 > > > The current OAuth implementation allows for a narrow selection of use cases > and the code is structured accordingly. With KAFKA-18573, we want to extend > support for OAuth so some internal refactoring needs to be performed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18573) Add support for OAuth jwt-bearer grant type
[ https://issues.apache.org/jira/browse/KAFKA-18573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-18573: -- Summary: Add support for OAuth jwt-bearer grant type (was: Add support for OAuth jwt_bearer grant type) > Add support for OAuth jwt-bearer grant type > --- > > Key: KAFKA-18573 > URL: https://issues.apache.org/jira/browse/KAFKA-18573 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 4.0.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: needs-kip > Fix For: 4.1.0 > > > The current OAuth implementation allows for a narrow selection of use cases. > Users have asked for the ability to have more out-of-the box options > regarding configuration and exposing of some of the internal building blocks > for better reuse. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dajac commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966104854 ## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java: ## @@ -197,6 +199,11 @@ public Builder withExecutorService(ExecutorService executorService) { return this; } +public Builder withAuthorizer(Optional authorizer) { Review Comment: It should not go via the runtime. You can directly pass it to the shard. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dajac commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966110059 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2592,6 +2607,32 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + var topicsToCheck = Set[String]() + response.groups.forEach(_.members.forEach { member => +List(member.assignment, member.targetAssignment).foreach { assignment => + assignment.topicPartitions.asScala.foreach { tp => +topicsToCheck += tp.topicName + } +} + }) + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, Review Comment: I think that we need to check the authorization per group because groups may have completely different topics. Hence the caller maybe able to see group A but not group B. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dajac commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966111439 ## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java: ## @@ -197,6 +199,11 @@ public Builder withExecutorService(ExecutorService executorService) { return this; } +public Builder withAuthorizer(Optional authorizer) { Review Comment: Check here: https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java#L196 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dajac commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966113060 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2533,7 +2561,7 @@ private boolean maybeUpdateRegularExpressions( executor.schedule( key, () -> refreshRegularExpressions(groupId, log, time, metadataImage, regexes), Review Comment: Would it be possible to do the filtering in `refreshRegularExpressions`? `refreshRegularExpressions` is executed in a different thread so we can take advantage of it too and avoid having to run the authorized in the coordinator threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dongnuo123 commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966119154 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2592,6 +2607,32 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + var topicsToCheck = Set[String]() + response.groups.forEach(_.members.forEach { member => +List(member.assignment, member.targetAssignment).foreach { assignment => + assignment.topicPartitions.asScala.foreach { tp => +topicsToCheck += tp.topicName + } +} + }) + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, Review Comment: I don't quite follow this. `topicsToCheck` should have all the topics of all described groups, and we set group with error if it has topic that isn't in `authorizedTopics`, so it's still possible for the caller to see group A but not group B(?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18614, KAFKA-18613: Add streams group request plumbing [kafka]
lucasbru commented on code in PR #18979: URL: https://github.com/apache/kafka/pull/18979#discussion_r1966098527 ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1675,6 +1675,12 @@ class KafkaConfigTest { assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols) assertTrue(config.isNewGroupCoordinatorEnabled) assertTrue(config.shareGroupConfig.isShareGroupEnabled) + +// This is OK. Review Comment: You are right, can be removed. I was just following the style in rest of the test. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -609,6 +612,34 @@ public List shareGroupDescribe( return describedGroups; } +/** + * Handles a StreamsGroupDescribe request. + * + * @param groupIds The IDs of the groups to describe. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A list containing the StreamsGroupDescribeResponseData.DescribedGroup. Review Comment: Good idea. Done ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -682,6 +725,58 @@ public CompletableFuture> return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll); } +/** + * See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}. + */ +@Override +public CompletableFuture> streamsGroupDescribe( +RequestContext context, +List groupIds +) { +if (!isActive.get()) { +return CompletableFuture.completedFuture(StreamsGroupDescribeRequest.getErrorDescribedGroupList( +groupIds, +Errors.COORDINATOR_NOT_AVAILABLE +)); +} + +final List>> futures = +new ArrayList<>(groupIds.size()); +final Map> groupsByTopicPartition = new HashMap<>(); Review Comment: The group coordinator is sharded by topic partition of the consumer offset topic. So we group the group IDs by the topic partitions of the consumer offset, which acts as an "address" of the right group coordinator. We fetch the described groups by instance and merge the results. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8944,6 +8947,113 @@ public void testConsumerGroupDescribeBeforeAndAfterCommittingOffset() { assertEquals(expected, actual); } +@Test +public void testStreamsGroupDescribeNoErrors() { +List streamsGroupIds = Arrays.asList("group-id-1", "group-id-2"); +int epoch = 10; +String memberId = "member-id"; +StreamsGroupMember.Builder memberBuilder = streamsGroupMemberBuilderWithDefaults(memberId) +.setClientTags(Collections.singletonMap("clientTag", "clientValue")) +.setProcessId("processId") +.setMemberEpoch(epoch) +.setPreviousMemberEpoch(epoch - 1); + +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch)) +.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), epoch) +.withMember(memberBuilder.build())) +.build(); + +List expected = Arrays.asList( +new StreamsGroupDescribeResponseData.DescribedGroup() +.setGroupEpoch(epoch) +.setGroupId(streamsGroupIds.get(0)) +.setGroupState(StreamsGroupState.EMPTY.toString()) +.setAssignmentEpoch(0), +new StreamsGroupDescribeResponseData.DescribedGroup() +.setGroupEpoch(epoch) +.setGroupId(streamsGroupIds.get(1)) +.setMembers(Collections.singletonList( +memberBuilder.build().asStreamsGroupDescribeMember( +TasksTuple.EMPTY +) +)) +.setGroupState(StreamsGroupState.NOT_READY.toString()) +); +List actual = context.sendStreamsGroupDescribe(streamsGroupIds); + +assertEquals(expected, actual); +} + +@Test +public void testStreamsGroupDescribeWithErrors() { +String groupId = "groupId"; + Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dajac commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966115642 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2698,6 +2741,60 @@ private CoordinatorResult handleRegularExpressionsResul return new CoordinatorResult<>(records); } +/** + * This method filters the topics in the resolvedRegularExpressions + * that the member is authorized to describe. + * + * @param context The request context. + * @param resolvedRegularExpressionsThe list of topic names to validate. + * @return The set of topics that the member is not authorized to describe. + */ +private Set filterTopicDescribeAuthorizedTopics( +RequestContext context, +Map resolvedRegularExpressions +) { +if (authorizer.isPresent()) { +Map topicNameCount = new HashMap<>(); + resolvedRegularExpressions.values().forEach(resolvedRegularExpression -> +resolvedRegularExpression.topics.forEach(topicName -> +topicNameCount.compute(topicName, Utils::incValue) +) +); + +List actions = topicNameCount.entrySet().stream().map(entry -> { +ResourcePattern resource = new ResourcePattern(TOPIC, entry.getKey(), LITERAL); +return new Action(DESCRIBE, resource, entry.getValue(), true, true); +}).collect(Collectors.toList()); + +List authorizationResults = authorizer.get().authorize(context, actions); +Set deniedTopics = new HashSet<>(); +IntStream.range(0, actions.size()).forEach(i -> { +if (authorizationResults.get(i) == AuthorizationResult.DENIED) { +String deniedTopic = actions.get(i).resourcePattern().name(); +deniedTopics.add(deniedTopic); +} +}); + +resolvedRegularExpressions.forEach((regex, resolvedRegularExpression) -> { +if (resolvedRegularExpression.topics.stream().anyMatch(deniedTopics::contains)) { Review Comment: This line may not be necessary. We could directly filter the list and avoid iterating it many times. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13788) Creation of invalid dynamic config prevents further creation of valid configs
[ https://issues.apache.org/jira/browse/KAFKA-13788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-13788: Fix Version/s: (was: 4.0.0) > Creation of invalid dynamic config prevents further creation of valid configs > - > > Key: KAFKA-13788 > URL: https://issues.apache.org/jira/browse/KAFKA-13788 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 2.8.0 >Reporter: Prateek Agarwal >Assignee: Deng Ziming >Priority: Minor > > Kafka currently allows creating an unknown dynamic config without any errors. > But it errors when next valid dynamic config gets created. > This can be seen locally in a cluster by creating a wrong config > {{log.cleaner.threadzz}} which was preventing creation of the valid config > later {{log.cleaner.threads}}. > {code} > # Invalid config 'log.cleaner.threadzz' gets created without issues > $ ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter > --add-config log.cleaner.threadzz=2 --entity-type brokers --entity-default 2>1 > Completed updating default config for brokers in the cluster. > {code} > Now when a valid config is added, {{kafka-configs.sh}} errors out: > {code} > $ ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter > --add-config log.cleaner.threads=2 --entity-type brokers --entity-default > All sensitive broker config entries must be specified for --alter, missing > entries: Set(log.cleaner.threadzz) > {code} > To fix this, one needs to first delete the incorrect config: > {code:java} > $ ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter > --delete-config log.cleaner.threadzz --entity-type brokers --entity-default > {code} > But ideally, the invalid config should error out so that creation of the > valid config doesn't get prevented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-18837: Validate controller.quorum.fetch.timeout.ms is a positive value [kafka]
TaiJuWu opened a new pull request, #18998: URL: https://github.com/apache/kafka/pull/18998 Jira: https://issues.apache.org/jira/browse/KAFKA-18837 We currently don't have a Validator when defining the controller.quorum.fetch.timeout.ms configuration. Setting it to 0 or a negative value does not make sense and prevents the broker from starting. With a validator, the error would appear when the storage is formatted. Also it would be show a meaningful valid values field in the broker configuration documentation. This is likely to apply to other quorum configurations. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dongnuo123 commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966119154 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2592,6 +2607,32 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + var topicsToCheck = Set[String]() + response.groups.forEach(_.members.forEach { member => +List(member.assignment, member.targetAssignment).foreach { assignment => + assignment.topicPartitions.asScala.foreach { tp => +topicsToCheck += tp.topicName + } +} + }) + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, Review Comment: I don't quite follow this. `topicsToCheck` should have all the topics of all described groups, and we loop through all the groups and set a group with error if it has topic that isn't in `authorizedTopics`, so it's still possible for the caller to see group A but not group B(?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18461: Fix potential NPE in setDelta after map is erased [kafka]
cmccabe commented on PR #18684: URL: https://github.com/apache/kafka/pull/18684#issuecomment-2675502963 Setting this to null was deliberate to catch use-after-free bugs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18808: add test to ensure the name= is not equal to default quota [kafka]
chia7712 commented on PR #18966: URL: https://github.com/apache/kafka/pull/18966#issuecomment-2675255945 It seems to me the unit test can't cover the case as it does not run the server code. Could you please use integration test to rewrite it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18795: Remove `Records#downConvert` [kafka]
chia7712 merged PR #18897: URL: https://github.com/apache/kafka/pull/18897 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-18795) Remove Records#downConvert
[ https://issues.apache.org/jira/browse/KAFKA-18795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-18795. Resolution: Fixed > Remove Records#downConvert > -- > > Key: KAFKA-18795 > URL: https://issues.apache.org/jira/browse/KAFKA-18795 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: kangning.li >Priority: Major > Fix For: 4.1.0 > > > Since we no longer convert records to the old format for fetch requests, this > code is no longer used in production. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-17622) Kafka Streams Timeout During Partition Rebalance
[ https://issues.apache.org/jira/browse/KAFKA-17622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929081#comment-17929081 ] Matthias J. Sax edited comment on KAFKA-17622 at 2/21/25 5:35 PM: -- KIP-1094: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1094%3A+Add+a+new+constructor+method+with+nextOffsets+to+ConsumerRecords] (PR: [https://github.com/apache/kafka/pull/17414|https://github.com/apache/kafka/pull/17414)]) along with this fix: [https://github.com/apache/kafka/pull/17091] prevent the timeout issue in such scenarios. was (Author: JIRAUSER298811): KIP-1094: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1094%3A+Add+a+new+constructor+method+with+nextOffsets+to+ConsumerRecords] (PR: [https://github.com/apache/kafka/pull/17414)] along with this fix: [https://github.com/apache/kafka/pull/17091] prevent the timeout issue in such scenarios. > Kafka Streams Timeout During Partition Rebalance > - > > Key: KAFKA-17622 > URL: https://issues.apache.org/jira/browse/KAFKA-17622 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > Fix For: 4.0.0 > > > Re: > [https://forum.confluent.io/t/kafka-streams-timeout-during-partition-rebalance-seeking-insights-on-notleaderorfollowerexception/11362] > Calling {{{}Consumer.position() from KS{}}}treams for computing the offset > that must be committed suffers from a race condition so that by the time we > want to commit, the position may be gone. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18844: Fix flaky QuorumControllerTest.testBalancePartitionLeaders() [kafka]
ijuma commented on PR #18997: URL: https://github.com/apache/kafka/pull/18997#issuecomment-2675079361 Yes, if it's mandatory, then we should set it in the constructor. Any issues doing that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15900, KAFKA-18310: fix flaky test testOutdatedCoordinatorAssignment and AbstractCoordinatorTest [kafka]
lianetm commented on code in PR #18945: URL: https://github.com/apache/kafka/pull/18945#discussion_r1963837019 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1561,28 +1555,25 @@ public void onFailure(RuntimeException e) { } } catch (AuthenticationException e) { log.error("An authentication error occurred in the heartbeat thread", e); -this.failed.set(e); +setFailed(e); } catch (GroupAuthorizationException e) { log.error("A group authorization error occurred in the heartbeat thread", e); -this.failed.set(e); +setFailed(e); } catch (InterruptedException | InterruptException e) { Thread.interrupted(); log.error("Unexpected interrupt received in heartbeat thread", e); -this.failed.set(new RuntimeException(e)); +setFailed(new RuntimeException(e)); } catch (Throwable e) { log.error("Heartbeat thread failed due to unexpected error", e); if (e instanceof RuntimeException) -this.failed.set((RuntimeException) e); +setFailed((RuntimeException) e); else -this.failed.set(new RuntimeException(e)); +setFailed(new RuntimeException(e)); } finally { log.debug("Heartbeat thread has closed"); -synchronized (AbstractCoordinator.this) { Review Comment: Don't we need to keep this synchronization here? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java: ## @@ -230,7 +230,8 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { this.interceptors, config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED), config.getString(ConsumerConfig.CLIENT_RACK_CONFIG), -clientTelemetryReporter); +clientTelemetryReporter, +Optional.empty()); Review Comment: we're introducing this param only for some tests, but end up having to pass it empty in lots of cases (this consumer class mainly, and other test files). So I wonder if it would be a fair trade off in this case to add a another constructor to the `ConsumerCoordinator` to take this param, but keep also the existing one that does not take it. Seems that we would reduce the scope of the changes and avoid the noise of this empty param when not needed. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatThread.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.KafkaThread; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractHeartbeatThread extends KafkaThread implements AutoCloseable { Review Comment: Should we add a java doc to show that this is a wrapper for a KafkaThread that allows to be enabled/disabled (looks like the main thing right?). And seeing it like this, is it intentionally abstract? No harm in it really if the intention is to express that we don't want to allow instances of it, but caught my attention (vs. non-abstract ~BaseHeartbeatThread) ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockHeartbeatThread.java: ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dajac commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966157730 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2592,6 +2607,32 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + var topicsToCheck = Set[String]() + response.groups.forEach(_.members.forEach { member => +List(member.assignment, member.targetAssignment).foreach { assignment => + assignment.topicPartitions.asScala.foreach { tp => +topicsToCheck += tp.topicName + } +} + }) + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, Review Comment: No. Imagine the followings groups: - Group A with topics a and b - Group B with topics c and d Caller may have access to topics a and b but not c and d. In this case, Group A should be returned but not B. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18522: Slice records for share fetch [kafka]
apoorvmittal10 commented on PR #18804: URL: https://github.com/apache/kafka/pull/18804#issuecomment-2675485219 > @apoorvmittal10 : Thanks for the updated PR. The code LGTM. Are the failed tests related to this PR? Thanks @junrao. The build passes on Java 17 and unrelated failure on Java 23. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dongnuo123 commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966161261 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2592,6 +2607,32 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + var topicsToCheck = Set[String]() + response.groups.forEach(_.members.forEach { member => +List(member.assignment, member.targetAssignment).foreach { assignment => + assignment.topicPartitions.asScala.foreach { tp => +topicsToCheck += tp.topicName + } +} + }) + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, Review Comment: So we just remove the group from the response. ie, in the above case, we should return ``` DescribeResponse( DescribedGroup(A, members(...)) ) ``` instead of ``` DescribeResponse( DescribedGroup(A, members(...)) DescribedGroup(B, error=TOPIC_AUTH_FAILURE) ) ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dajac commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966164105 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2592,6 +2607,32 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + var topicsToCheck = Set[String]() + response.groups.forEach(_.members.forEach { member => +List(member.assignment, member.targetAssignment).foreach { assignment => + assignment.topicPartitions.asScala.foreach { tp => +topicsToCheck += tp.topicName + } +} + }) + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, Review Comment: ``` DescribeResponse( DescribedGroup(A, members(...)) DescribedGroup(B, error=TOPIC_AUTH_FAILURE) ) ``` This is the correct one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: [3/N] Client support for TopicAuthException in DescribeConsumerGroup path [kafka]
lianetm commented on PR #18996: URL: https://github.com/apache/kafka/pull/18996#issuecomment-2675188273 Cherry-picked to 4.0 https://github.com/apache/kafka/commit/364479cf24d9829d84073f490aa864eb6fa3540f -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix Some Exceptions in MetaPropertiesEnsemble and MetadataQuorumCommand [kafka]
jsancio commented on code in PR #18957: URL: https://github.com/apache/kafka/pull/18957#discussion_r1965923366 ## tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java: ## @@ -404,16 +404,16 @@ static Set getControllerAdvertisedListeners( } LinkedHashSet results = new LinkedHashSet<>(); for (String listenerName : props.getProperty( -KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG).split(",")) { +KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG).split(",")) { Review Comment: How about this indentation and formatting so that the for loop statement block is clearly separated from the for loop iterable. ```java for (String listenerName : props.getProperty( KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG).split(",") ) { ``` ## tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java: ## @@ -404,16 +404,16 @@ static Set getControllerAdvertisedListeners( } LinkedHashSet results = new LinkedHashSet<>(); for (String listenerName : props.getProperty( -KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG).split(",")) { +KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG).split(",")) { listenerName = ListenerName.normalised(listenerName).value(); Endpoint endpoint = listeners.get(listenerName); if (endpoint == null) { throw new TerseException("Cannot find information about controller listener name: " + listenerName); } results.add(new RaftVoterEndpoint(endpoint.listenerName().get(), -endpoint.host() == null ? "localhost" : endpoint.host(), -endpoint.port())); +endpoint.host() == null ? "localhost" : endpoint.host(), +endpoint.port())); Review Comment: How about this formatting? ```java results.add( new RaftVoterEndpoint( endpoint.listenerName().get(), endpoint.host() == null ? "localhost" : endpoint.host(), endpoint.port() ) ); ``` ## tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java: ## @@ -266,13 +266,13 @@ else if (leader.lastCaughtUpTimestamp().isPresent() && maxLagFollower.lastCaught System.out.println( "ClusterId: " + clusterId + -"\nLeaderId: " + quorumInfo.leaderId() + -"\nLeaderEpoch:" + quorumInfo.leaderEpoch() + -"\nHighWatermark: " + quorumInfo.highWatermark() + -"\nMaxFollowerLag: " + maxFollowerLag + -"\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs + -"\nCurrentVoters: " + printVoterState(quorumInfo) + -"\nCurrentObservers: " + printObserverState(quorumInfo) +"\nLeaderId: " + quorumInfo.leaderId() + +"\nLeaderEpoch:" + quorumInfo.leaderEpoch() + +"\nHighWatermark: " + quorumInfo.highWatermark() + +"\nMaxFollowerLag: " + maxFollowerLag + +"\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs + +"\nCurrentVoters: " + printVoterState(quorumInfo) + +"\nCurrentObservers: " + printObserverState(quorumInfo) Review Comment: This indentation doesn't look correct. We indent Java code using 4 spaces. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org