[GitHub] [kafka] chia7712 opened a new pull request #9128: KAFKA-7540 reduce session timeout to evict dead member in time and so…
chia7712 opened a new pull request #9128: URL: https://github.com/apache/kafka/pull/9128 issue: https://issues.apache.org/jira/browse/KAFKA-7540 In short, the LEAVE_GROUP is sent to a shutdown broker so the dead member which is left in the group obstructs broker from completing rebalance. The test case creates two consumers (with different group) and then shutdown their coordinator (broker). After first coordinator is shutdown, the heartbeat thread of first consumer gets timeout so it sends LEAVE_GROUP request to “next” coordinator. Unfortunately, the LEAVE_GROUP request can’t be processed successfully if the ”next” coordinator is the coordinator of second consumer (yep, it is shutdown also). The simple approach is that we can reduce the session timeout so coordinator can evict dead member in time. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9128: KAFKA-7540 reduce session timeout to evict dead member in time and so…
chia7712 commented on pull request #9128: URL: https://github.com/apache/kafka/pull/9128#issuecomment-669020702 I have looped the test 40 times. all pass This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
omkreddy commented on pull request #9050: URL: https://github.com/apache/kafka/pull/9050#issuecomment-669040004 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r465565925 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (String) and a range of versions (defined by a + * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of its own supported features in its + * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (String) and a range of version levels (defined + * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the entire list of + *default supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + *Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + *broker binary has been upgraded to a newer version that supports the feature versioning + *system (KIP-584). This means the user is upgrading from an earlier version of the broker + *binary. In this case, we want to start with no finalized features and allow the user to + *finalize them whenever they are ready i.e. in the future whenever the user sets IBP config + *to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the + *features. This process ensures we do not enable all the possible features immediately after + *an upgrade, which could be harmful to Kafka. + *This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + *controller will start up and check if the FeatureZNode is absent. If absent, it will + *react by creating a FeatureZNode with disabled status and empty finalized features. + *Otherwise, if a node already exists in enabled status then the controller will just + *flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + *KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + *and whether it is disabled. In such a case, it won’t upgrade all features immediately. + *Instead it will just switch the FeatureZNode status to enabled status. This lets
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463934710 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -266,6 +275,179 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (String) and a range of versions (defined by a + * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of it’s own supported features in its + * own BrokerIdZnode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is is represented by a name (String) and a range of version levels (defined + * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in ZK in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the one and only entity modifying + * the information about finalized features and their version levels. + * + * This method sets up the FeatureZNode with enabled status. This status means the feature + * versioning system (KIP-584) is enabled, and, the finalized features stored in the FeatureZNode + * are active. This status should be written by the controller to the FeatureZNode only when the + * broker IBP config is greater than or equal to KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the entire list of + *default supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + *Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + *Broker binary has been upgraded to a newer version that supports the feature versioning + *system (KIP-584). This means the user is upgrading from an earlier version of the Broker + *binary. In this case, we want to start with no finalized features and allow the user to + *finalize them whenever they are ready i.e. in the future whenever the user sets IBP config + *to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the + *features. The reason to do this is that enabling all the possible features immediately after + *an upgrade could be harmful to the cluster. + *This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + *controller will start up and check if the FeatureZNode is absent. If absent, then it + *will react by creating a FeatureZNode with disabled status and empty finalized features. + *Otherwise, if a node already exists in enabled status then the controller will just + *flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + *KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + *and whether it is disabled. In such a case, it won’t upgrade all features immediat
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r465570359 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -2945,6 +2948,130 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleUpdateFeatures(request: RequestChannel.Request): Unit = { Review comment: This does not seem to be required, since it is already achieved via `UpdateFeaturesTest`. Infact there we test using admin client, which is even better as it tests e2e client to server functionality. What do we gain by adding the additional tests in `KafkaApisTest` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r465572011 ## File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json ## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "response", + "name": "UpdateFeaturesResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ Review comment: I don't see that we consistently use a top level error code, so I will leave it as it is. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,71 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); + +/** + * Applies specified updates to finalized features. This operation is not transactional so it + * may succeed for some features while fail for others. + * + * The API takes in a map of finalized feature name to {@link FeatureUpdate} that needs to be + * applied. Each entry in the map specifies the finalized feature to be added or updated or + * deleted, along with the new max feature version level value. This request is issued only to + * the controller since the API is only served by the controller. The return value contains an + * error code for each supplied {@link FeatureUpdate}, and the code indicates if the update + * succeeded or failed in the controller. + * + * Downgrade of feature version level is not a regular operation/intent. It is only allowed + * in the controller if the {@link FeatureUpdate} has the allowDowngrade flag set - setting this + * flag conveys user intent to attempt downgrade of a feature max version level. Note that + * despite the allowDowngrade flag being set, certain downgrades may be rejected by the + * controller if it is deemed impossible. + * Deletion of a finalized feature version is not a regular operation/intent. It could be + * done by setting the allowDowngrade flag to true in the {@link FeatureUpdate}, and, setting + * the max version level to be less than 1. + * + * + * The following exceptions can be anticipated when calling {@code get()} on the futures + * obtained from the returned {@link UpdateFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.ClusterAuthorizationException} + * If the authenticated user didn't have alter access to the cluster. + * {@link org.apache.kafka.common.errors.InvalidRequestException} Review comment: Answered below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r465572011 ## File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json ## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "response", + "name": "UpdateFeaturesResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ Review comment: I don't see that we consistently use a top level error code, so I will leave it as it is. It feels OK for this api to not use it as it makes little difference. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r465572011 ## File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json ## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "response", + "name": "UpdateFeaturesResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ Review comment: I don't see that we consistently use a top level error code across other Kafka apis, so I will leave it as it is. It feels OK for this api to not use it, as it does not make a significant difference. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Issue Comment Deleted] (KAFKA-10095) In LogCleanerManagerTest replace get().nonEmpty call with contains
[ https://issues.apache.org/jira/browse/KAFKA-10095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Reuhl updated KAFKA-10095: - Comment: was deleted (was: {{I'm a newbie to kafka and would like to pick up the issue. One idea would be to change the code to actually checking that the topicPartition is _contained_ in the cleaner checkpoints:}} {code:java} cleanerManager.setCleaningState(topicPartition, LogCleaningInProgress) cleanerManager.doneCleaning(topicPartition, log.dir, 1) assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty) assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition)) cleanerManager.setCleaningState(topicPartition, LogCleaningAborted) cleanerManager.doneCleaning(topicPartition, log.dir, 1) assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(topicPartition).get) assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition)) {code} ) > In LogCleanerManagerTest replace get().nonEmpty call with contains > -- > > Key: KAFKA-10095 > URL: https://issues.apache.org/jira/browse/KAFKA-10095 > Project: Kafka > Issue Type: Improvement > Components: log cleaner, unit tests >Reporter: Jakob Homan >Assignee: Sarah Gonsalves >Priority: Trivial > Labels: newbie > > n.b. This is a newbie ticket designed to be an introduction to contributing > for the assignee. > In kafka.log.LogCleanerManagerTest we have two calls to > .get(something).nonEmpty, which is equivalent to .contains(something). We > should simplify these calls. > {code}cleanerManager.doneCleaning(topicPartition, log.dir, 1) > assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty) > > assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty) > cleanerManager.setCleaningState(topicPartition, LogCleaningAborted) > cleanerManager.doneCleaning(topicPartition, log.dir, 1) > assertEquals(LogCleaningPaused(1), > cleanerManager.cleaningState(topicPartition).get) > > assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10363) Broker try to connect to a new cluster when there are changes in zookeeper.connect properties
Alexey Kornev created KAFKA-10363: - Summary: Broker try to connect to a new cluster when there are changes in zookeeper.connect properties Key: KAFKA-10363 URL: https://issues.apache.org/jira/browse/KAFKA-10363 Project: Kafka Issue Type: Bug Affects Versions: 2.3.1, 2.4.0 Environment: 3 Kafka brokers (v2.3.1, v2.4.0) with Zookeeper cluster (3.4.10) Ubuntu 18.04 LTS Reporter: Alexey Kornev We've just successfully set up a Kafka cluster consists of 3 brokers and faced with the following issue: when we change order of zookeeper servers in zookeeper.connect property in server.properties files and restart Kafka broker then this Kafka broker tries to connect to a new Kafka cluster. As a result, Kafka broker throws an error and shutdown. For example, config server.properties on first broker: {code:java} broker.id=-1 ... zookeeper.connect=node_1:2181/kafka,node_2:2181/kafka,node_3:2181/kafka {code} We changed it to {code:java} broker.id=-1 ... zookeeper.connect=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka {code} and restart Kafka broker. Logs: {code:java} [2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 09:07:57,070] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)[2020-08-05 09:07:57,656] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)[2020-08-05 09:07:57,657] INFO starting (kafka.server.KafkaServer)[2020-08-05 09:07:57,658] INFO Connecting to zookeeper on node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka (kafka.server.KafkaServer)[2020-08-05 09:07:57,685] INFO [ZooKeeperClient Kafka server] Initializing a new session to node_2:2181. (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,690] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client environment:java.version=11.0.8 (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client environment:java.vendor=Ubuntu (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client environment:java.class.path=/opt/kafka/current/bin/../libs/activation-1.1.1.jar:/opt/kafka/current/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/current/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/current/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/current/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/current/bin/../libs/connect-api-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-basic-auth-extension-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-file-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-json-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-runtime-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-transforms-2.3.1.jar:/opt/kafka/current/bin/../libs/guava-20.0.jar:/opt/kafka/current/bin/../libs/hk2-api-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-locator-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-utils-2.5.0.jar:/opt/kafka/current/bin/../libs/jackson-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-core-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-databind-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-scala_2.12-2.10.0.jar:/opt/kafka/current/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/kafka/current/bin/../libs/jakarta.annotation-api-1.3.4.jar:/opt/kafka/current/bin/../libs/jakarta.inject-2.5.0.jar:/opt/kafka/current/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/opt/kafka/current/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/kafka/current/bin/../libs/javassist-3.22.0-CR2.jar:/opt/kafka/current/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/current/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/current/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/current/bin/../libs/jersey-client-2.28.jar:/opt/kafka/
[jira] [Updated] (KAFKA-10363) Broker try to connect to a new cluster when there are changes in zookeeper.connect properties
[ https://issues.apache.org/jira/browse/KAFKA-10363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Kornev updated KAFKA-10363: -- Description: We've just successfully set up a Kafka cluster consists of 3 brokers and faced with the following issue: when we change order of zookeeper servers in zookeeper.connect property in server.properties files and restart Kafka broker then this Kafka broker tries to connect to a new Kafka cluster. As a result, Kafka broker throws an error and shutdown. For example, config server.properties on first broker: {code:java} broker.id=-1 ... zookeeper.connect=node_1:2181/kafka,node_2:2181/kafka,node_3:2181/kafka {code} We changed it to {code:java} broker.id=-1 ... zookeeper.connect=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka {code} and restart Kafka broker. Logs: {code:java} [2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 09:07:57,070] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)[2020-08-05 09:07:57,656] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)[2020-08-05 09:07:57,657] INFO starting (kafka.server.KafkaServer)[2020-08-05 09:07:57,658] INFO Connecting to zookeeper on node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka (kafka.server.KafkaServer)[2020-08-05 09:07:57,685] INFO [ZooKeeperClient Kafka server] Initializing a new session to node_2:2181. (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,690] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client environment:java.version=11.0.8 (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client environment:java.vendor=Ubuntu (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client environment:java.class.path=/opt/kafka/current/bin/../libs/activation-1.1.1.jar:/opt/kafka/current/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/current/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/current/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/current/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/current/bin/../libs/connect-api-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-basic-auth-extension-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-file-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-json-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-runtime-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-transforms-2.3.1.jar:/opt/kafka/current/bin/../libs/guava-20.0.jar:/opt/kafka/current/bin/../libs/hk2-api-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-locator-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-utils-2.5.0.jar:/opt/kafka/current/bin/../libs/jackson-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-core-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-databind-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-scala_2.12-2.10.0.jar:/opt/kafka/current/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/kafka/current/bin/../libs/jakarta.annotation-api-1.3.4.jar:/opt/kafka/current/bin/../libs/jakarta.inject-2.5.0.jar:/opt/kafka/current/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/opt/kafka/current/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/kafka/current/bin/../libs/javassist-3.22.0-CR2.jar:/opt/kafka/current/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/current/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/current/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/current/bin/../libs/jersey-client-2.28.jar:/opt/kafka/current/bin/../libs/jersey-common-2.28.jar:/opt/kafka/current/bin/../libs/jersey-container-servlet-2.28.jar:/opt/kafka/current/bin/../libs/jersey-container-servlet-core-2.28.jar:/opt/kafka/current/bin/../libs/jersey-hk2-2.28.jar:/opt/kafka/current/bin/../libs/jersey-media-jaxb-2.28.jar:/opt/kafka/current/bin/../libs/jersey
[GitHub] [kafka] cmccabe commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
cmccabe commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465708837 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1212,17 +1221,32 @@ class Partition(val topicPartition: TopicPartition, private def expandIsr(newIsr: Set[Int]): Unit = { val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) -val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr) -maybeUpdateIsrAndVersion(newIsr, zkVersionOpt) +//val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr) +//maybeUpdateIsrAndVersion(newIsr, zkVersionOpt) +alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, newLeaderAndIsr, handleIsrUpdate)) +alterIsrInFlight = true + +info("ISR updated to [%s]".format(newIsr.mkString(","))) +inSyncReplicaIds = newIsr } private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = { val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) -val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr) -maybeUpdateIsrAndVersion(newIsr, zkVersionOpt) +//val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr) +//maybeUpdateIsrAndVersion(newIsr, zkVersionOpt) +alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, newLeaderAndIsr, handleIsrUpdate)) +alterIsrInFlight = true } - private[cluster] def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt: Option[Int]): Unit = { + private def handleIsrUpdate(error: Errors): Unit = { +error match { + case NONE => println(s"Controller accepted ISR for $topicPartition") + case _ => println(s"Had an error sending ISR for $topicPartition : $error") +} +alterIsrInFlight = false + } Review comment: It seems like we need to set the `inSyncReplicaIds` here, since we don't do it in `shrinkIsr`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
cmccabe commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465711962 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig, } } + // TODO is it okay to pull message classes down into the controller? + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { +//val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId()) +/*if (brokerEpochOpt.isEmpty) { + info(s"Ignoring AlterIsr due to unknown broker ${alterIsrRequest.brokerId()}") + // TODO is INVALID_REQUEST a reasonable error here? + callback.apply(new AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code)) + return +} + +if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) { + info(s"Ignoring AlterIsr due to stale broker epoch ${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}") + callback.apply(new AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)) + return +}*/ + +val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + +val resp = new AlterIsrResponseData() +resp.setTopics(new util.ArrayList()) + +alterIsrRequest.topics().forEach(topicReq => { + val topicResp = new AlterIsrResponseTopics() +.setName(topicReq.name()) +.setPartitions(new util.ArrayList()) + resp.topics().add(topicResp) + + topicReq.partitions().forEach(partitionReq => { +val partitionResp = new AlterIsrResponsePartitions() + .setPartitionIndex(partitionReq.partitionIndex()) +topicResp.partitions().add(partitionResp) + +// For each partition who's ISR we are altering, let's do some upfront validation for the broker response Review comment: This is also a concurrency bug since you can't access stuff like the controllerContext except from the controller thread itself (it would be multi-threaded access without a lock) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #8117: KAFKA-8403: Suppress needs a Materialized variant
dongjinleekr commented on pull request #8117: URL: https://github.com/apache/kafka/pull/8117#issuecomment-669182284 Hello. I updated the [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable) and rebased the PR onto the latest trunk. Please have a look when you are free. cc/ @vvcephei @mjsax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
cmccabe commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465712943 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -113,8 +113,8 @@ class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache brokerToControllerListenerName, time, threadName) } - private[server] def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest], - callback: RequestCompletionHandler): Unit = { + def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest], + callback: RequestCompletionHandler): Unit = { requestQueue.put(BrokerToControllerQueueItem(request, callback)) Review comment: This also needs to call `NetworkClient#wake` in case we are blocking inside `NetworkClient#poll` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
cmccabe commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465715770 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,121 @@ +package kafka.server Review comment: It would be good to find a better name for this. When I read "AlterIsrChannelManager" I assumed it had its own separate channel, rather than using the ControllerChannelManager. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
dongjinleekr commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-669229785 Here is the KIP - [KIP-653: Upgrade log4j to log4j2](https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+Upgrade+log4j+to+log4j2) @omkreddy Could you have a look? I have thought you must be the perfect reviewer [for this feature](https://github.com/omkreddy/log4j2-kafka-appender). :smile: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465756356 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (Timesta
[GitHub] [kafka] ijuma commented on pull request #9129: MINOR: Update jmh to 1.24 for async profiler support
ijuma commented on pull request #9129: URL: https://github.com/apache/kafka/pull/9129#issuecomment-669221375 Actually, it looks like I need to improve the jmh script to make this easier to use. Will also update the README. So, please don't review it yet. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465743922 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig, } } + // TODO is it okay to pull message classes down into the controller? + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { +//val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId()) +/*if (brokerEpochOpt.isEmpty) { + info(s"Ignoring AlterIsr due to unknown broker ${alterIsrRequest.brokerId()}") + // TODO is INVALID_REQUEST a reasonable error here? + callback.apply(new AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code)) + return +} + +if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) { + info(s"Ignoring AlterIsr due to stale broker epoch ${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}") + callback.apply(new AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)) + return +}*/ + +val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + +val resp = new AlterIsrResponseData() +resp.setTopics(new util.ArrayList()) + +alterIsrRequest.topics().forEach(topicReq => { + val topicResp = new AlterIsrResponseTopics() +.setName(topicReq.name()) +.setPartitions(new util.ArrayList()) + resp.topics().add(topicResp) + + topicReq.partitions().forEach(partitionReq => { +val partitionResp = new AlterIsrResponsePartitions() + .setPartitionIndex(partitionReq.partitionIndex()) +topicResp.partitions().add(partitionResp) + +// For each partition who's ISR we are altering, let's do some upfront validation for the broker response Review comment: @cmccabe good to know about `controllerContext` 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r465744271 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { +@Override +public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { +return new DescribeUserScramCredentialsRequest.Builder( +new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(; +} + +@Override +public void handleResponse(AbstractResponse abstractResponse) { +DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; +Errors error = Errors.forCode(response.data().error()); +switch (error) { +case NONE: +DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, +userScramCredential -> { +List scramCredentialInfos = userScramCredential.credentialInfos().stream().map( +credentialInfo -> new ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), credentialInfo.iterations())) +.collect(Collectors.toList()); +return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); +}))); +break; +case NOT_CONTROLLER: +handleNotControllerError(error); +break; +default: +future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); +break; +} +} + +@Override +void handleFailure(Throwable throwable) { +future.completeExceptionally(throwable); +} +}; +runnable.call(call, now); +return new DescribeUserScramCredentialsResult(future); +} + +@Override +public AlterUserScramCredentialsResult alterUserScramCredentials(List alterations, + AlterUserScramCredentialsOptions options) { +final long now = time.milliseconds(); +final Map> futures = new HashMap<>(); +for (UserScramCredentialAlteration alteration: alterations) { +futures.put(alteration.getUser(), new KafkaFutureImpl<>()); +} +final Map userIllegalAlterationExceptions = new HashMap<>(); +// We need to keep track of users with deletions of an unknown SCRAM mechanism +alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { +UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; +ScramMechanism mechanism = deletion.getMechanism(); +if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { +userIllegalAlterationExceptions.put(deletion.getUser(), new IllegalArgumentException("Unknown SCRAM mechanism")); Review comment: How about `InvalidRequestException`? It's already used in this class, and it might be more appropriate than `InvalidConfigurationException`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] johnthotekat commented on pull request #9120: KAFKA-10316: Consider renaming getter method for Interactive Queries
johnthotekat commented on pull request #9120: URL: https://github.com/apache/kafka/pull/9120#issuecomment-669241684 @mjsax One of the test failed for me in local, This one - KafkaAdminClientTest#testMetadataRetries. It's unrelated to the changes in this PR. Rest of the tests PASSED without any issues. I guess its the same issue we are addressing with https://issues.apache.org/jira/browse/KAFKA-10311 ? ``` org.apache.kafka.clients.admin.KafkaAdminClientTest > testMetadataRetries FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1596638067201, tries=1, nextAllowedTryMs=1596638067302) timed out at 1596638067202 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries(KafkaAdminClientTest.java:997) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1596638067201, tries=1, nextAllowedTryMs=1596638067302) timed out at 1596638067202 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] johnthotekat edited a comment on pull request #9120: KAFKA-10316: Consider renaming getter method for Interactive Queries
johnthotekat edited a comment on pull request #9120: URL: https://github.com/apache/kafka/pull/9120#issuecomment-669241684 @mjsax One of the test failed for me in local, This one - KafkaAdminClientTest#testMetadataRetries. It's unrelated to the changes in this PR. Rest of the tests PASSED without any issues. I guess its the same issue we are addressing with https://issues.apache.org/jira/browse/KAFKA-10311 ? There is also an open PR under this. ``` org.apache.kafka.clients.admin.KafkaAdminClientTest > testMetadataRetries FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1596638067201, tries=1, nextAllowedTryMs=1596638067302) timed out at 1596638067202 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries(KafkaAdminClientTest.java:997) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1596638067201, tries=1, nextAllowedTryMs=1596638067302) timed out at 1596638067202 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465746162 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1771,6 +1776,141 @@ class KafkaController(val config: KafkaConfig, } } + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { +val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + +alterIsrRequest.topics().forEach(topicReq => topicReq.partitions().forEach(partitionReq => { Review comment: It might be simpler just to use AlterIsrRequestData and AlterIsrResponseData throughout this code (rather than converting to `Map[TopicPartition, LeaderAndIsr]` and `Map[TopicPartition, Errors]`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #9129: MINOR: Update jmh to 1.24 for async profiler support
ijuma opened a new pull request #9129: URL: https://github.com/apache/kafka/pull/9129 Highlights: * async-profiler integration. Can be used with -prof async, pass -prof async:help to look for the accepted options. * perf c2c [2] integration. Can be used with -prof perfc2c, if available. Full details: https://mail.openjdk.java.net/pipermail/jmh-dev/2020-August/002982.html ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] johnthotekat edited a comment on pull request #9120: KAFKA-10316: Consider renaming getter method for Interactive Queries
johnthotekat edited a comment on pull request #9120: URL: https://github.com/apache/kafka/pull/9120#issuecomment-669241684 @mjsax One of the test failed for me in local, This one - KafkaAdminClientTest#testMetadataRetries. It's unrelated to the changes in this PR. Rest of the tests PASSED without any issues. I guess its the same issue we are addressing with https://issues.apache.org/jira/browse/KAFKA-10311 ? There is also a PR open under this. ``` org.apache.kafka.clients.admin.KafkaAdminClientTest > testMetadataRetries FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1596638067201, tries=1, nextAllowedTryMs=1596638067302) timed out at 1596638067202 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries(KafkaAdminClientTest.java:997) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1596638067201, tries=1, nextAllowedTryMs=1596638067302) timed out at 1596638067202 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465748504 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,121 @@ +package kafka.server Review comment: Yea, maybe just "AlterIsrManager"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465748120 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1771,6 +1776,141 @@ class KafkaController(val config: KafkaConfig, } } + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { +val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + +alterIsrRequest.topics().forEach(topicReq => topicReq.partitions().forEach(partitionReq => { + val tp = new TopicPartition(topicReq.name(), partitionReq.partitionIndex()) + val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt) + isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId(), partitionReq.leaderEpoch(), newIsr, partitionReq.currentIsrVersion())) +})) + +def responseCallback(results: Either[Map[TopicPartition, Errors], Errors]): Unit = { + val resp = new AlterIsrResponseData() + results match { +case Right(error) => + resp.setErrorCode(error.code()) +case Left(partitions: Map[TopicPartition, Errors]) => + resp.setTopics(new util.ArrayList()) + partitions.groupBy(_._1.topic()).foreachEntry((topic, partitionMap) => { +val topicResp = new AlterIsrResponseTopics() + .setName(topic) + .setPartitions(new util.ArrayList()) +resp.topics().add(topicResp) +partitionMap.foreachEntry((partition, error) => { + topicResp.partitions().add( +new AlterIsrResponsePartitions() + .setPartitionIndex(partition.partition()) + .setErrorCode(error.code())) +}) + }) + } + callback.apply(resp) +} + +eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId(), alterIsrRequest.brokerEpoch(), isrsToAlter, responseCallback)) + } + + private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], + callback: AlterIsrCallback): Unit = { +if (!isActive) { + callback.apply(Right(Errors.NOT_CONTROLLER)) + return +} + +val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) +if (brokerEpochOpt.isEmpty) { + info(s"Ignoring AlterIsr due to unknown broker $brokerId") + callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + return +} + +if (!brokerEpochOpt.contains(brokerEpoch)) { + info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for broker $brokerId") + callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + return +} + +val partitionErrors: mutable.Map[TopicPartition, Errors] = mutable.HashMap[TopicPartition, Errors]() + +val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap { + case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) => +val partitionError: Errors = controllerContext.partitionLeadershipInfo(tp) match { + case Some(leaderIsrAndControllerEpoch) => +val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr +if (newLeaderAndIsr.leader != currentLeaderAndIsr.leader) { + Errors.NOT_LEADER_OR_FOLLOWER +} else if (newLeaderAndIsr.leaderEpoch < currentLeaderAndIsr.leaderEpoch) { + Errors.FENCED_LEADER_EPOCH +} else { + val currentAssignment = controllerContext.partitionReplicaAssignment(tp) + if (!newLeaderAndIsr.isr.forall(replicaId => currentAssignment.contains(replicaId))) { +warn(s"Some of the proposed ISR are not in the assignment for partition $tp. Proposed ISR=$newLeaderAndIsr.isr assignment=$currentAssignment") +Errors.INVALID_REQUEST + } else if (!newLeaderAndIsr.isr.forall(replicaId => controllerContext.isReplicaOnline(replicaId, tp))) { +warn(s"Some of the proposed ISR are offline for partition $tp. Proposed ISR=$newLeaderAndIsr.isr") +Errors.INVALID_REQUEST + } else { +Errors.NONE + } +} + case None => Errors.UNKNOWN_TOPIC_OR_PARTITION +} +if (partitionError == Errors.NONE) { + // Bump the leaderEpoch for partitions that we're going to write + Some(tp -> newLeaderAndIsr.newEpochAndZkVersion) +} else { + partitionErrors.put(tp, partitionError) + None +} +} + +// Do the updates in ZK +info(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.") +val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = zkClient.updateLeaderAndIsr( + adjustedIsrs, controllerContext.epoch, controllerContext.epochZkVersion) + +val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = finishedUpdates.flatMap { + case (partition
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171519#comment-17171519 ] Ryanne Dolan commented on KAFKA-10339: -- I think what you mean is that a read-process-write loop cannot span clusters, since a transaction coordinator on one cluster cannot commit offsets on another cluster. But I don't think we actually need that -- we can just store offsets on the target cluster instead. I think what we need is something along these lines: - we manage offsets ourselves -- we don't rely on Connect's internal offsets tracking or __consumer_offsets on the source cluster. - we only write to the target cluster. - offsets are stored on the target cluster using a "fake" consumer group. I say "fake" because there would be no actual records being consumed by the group, just offsets being stored in __consumer_offsets topic. - we write all records in a transaction, just as the KIP currently describes. - in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster. - when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster. Result: - if the transaction succeeds, the __consumer_offsets topic on the target cluster is updated. - if the transaction aborts, all data records are dropped, and the __consumer_offsets topic is not updated. - when MirrorSourceTask starts/restarts, it resumes at the last committed offsets, as recorded in the target cluster. Thoughts? > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465749546 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) Review comment: Should probably get rid of this and change the method to `enqueueIsrUpdate(TopicPartition, LeaderAndIsr)` ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) Review comment: Should probably get rid of this and change the method to ```enqueueIsrUpdate(TopicPartition, LeaderAndIsr)``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465749546 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) Review comment: Should probably get rid of this and change the method to ```enqueueIsrUpdate(TopicPartition, LeaderAndIsr)``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465750445 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() +.setName(topic) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +) + }) +}) + +def responseHandler(response: ClientResponse): Unit = { + println(response.responseBody().toString(response.requestHeader().apiVersion())) Review comment: Remove this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465750632 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() +.setName(topic) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +) + }) +}) + +def responseHandler(response: ClientResponse): Unit = { + println(response.responseBody().toString(response.requestHeader().apiVersion())) + val body: AlterIsrResponse = response.responseBody().asInstanceOf[AlterIsrResponse] + val data: AlterIsrResponseData = body.data() + Errors.forCode(data.errorCode()) match { +case Errors.NONE => info(s"Controller handled AlterIsr request") +case e: Errors => warn(s"Controller returned an error when handling AlterIsr request:
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r465751005 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { +@Override +public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { +return new DescribeUserScramCredentialsRequest.Builder( +new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(; +} + +@Override +public void handleResponse(AbstractResponse abstractResponse) { +DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; +Errors error = Errors.forCode(response.data().error()); +switch (error) { +case NONE: +DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, +userScramCredential -> { +List scramCredentialInfos = userScramCredential.credentialInfos().stream().map( +credentialInfo -> new ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), credentialInfo.iterations())) +.collect(Collectors.toList()); +return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); +}))); +break; +case NOT_CONTROLLER: +handleNotControllerError(error); +break; +default: +future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); +break; +} +} + +@Override +void handleFailure(Throwable throwable) { +future.completeExceptionally(throwable); +} +}; +runnable.call(call, now); +return new DescribeUserScramCredentialsResult(future); +} + +@Override +public AlterUserScramCredentialsResult alterUserScramCredentials(List alterations, + AlterUserScramCredentialsOptions options) { +final long now = time.milliseconds(); +final Map> futures = new HashMap<>(); +for (UserScramCredentialAlteration alteration: alterations) { +futures.put(alteration.getUser(), new KafkaFutureImpl<>()); +} +final Map userIllegalAlterationExceptions = new HashMap<>(); +// We need to keep track of users with deletions of an unknown SCRAM mechanism +alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { +UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; +ScramMechanism mechanism = deletion.getMechanism(); +if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { +userIllegalAlterationExceptions.put(deletion.getUser(), new IllegalArgumentException("Unknown SCRAM mechanism")); +} +}); +// Creating an upsertion may throw InvalidKeyException or NoSuchAlgorithmException, +// so keep track of which users are affected by such a failure and immediately fail all their alterations +final Map> userInsertions = new HashMap<>(); +alterations.stream().filter(a -> a instanceof UserScramCredentialUpsertion) +.filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.getUser())) +.forEach(alteration -> { +UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; +String user = upsertion.getUser(); +try { +Scr
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465751597 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (Timesta
[GitHub] [kafka] hachikuji opened a new pull request #9130: [DRAFT] Kafka Raft Implementation (KIP-595)
hachikuji opened a new pull request #9130: URL: https://github.com/apache/kafka/pull/9130 This is a draft of the changes needed for KIP-595. The draft tag will be removed in the upcoming weeks if/when the KIP is adopted. Note that there are still a few significant protocol differences from the documented proposal that we are still in the process of reconciling. Co-authored-by: Boyang Chen Co-authored-by: Guozhang Wang ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded
mumrah commented on pull request #7222: URL: https://github.com/apache/kafka/pull/7222#issuecomment-669259920 Reviving this PR cc @hachikuji @ijuma @andrewchoi5 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10334) Transactions not working properly
[ https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luis Araujo updated KAFKA-10334: Description: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the documentation: [https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html ] Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. was: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: "org.apache.kafka" % "kafka-clients" % "2.1.0" I followed the documentation and I was expecting that transactions fail when I call .commitTransaction if some problem is raised when sending a message like it's described in the documentation: [https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html ] Unfortunatelly, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling commitTransaction() - when the message is bigger than 1MB, the transaction is completed successfully without the message being written. no exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. > Transactions not working properly > - > > Key: KAFKA-10334 > URL: https://issues.apache.org/jira/browse/KAFKA-10334 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Luis Araujo >Priority: Major > > I'm using transactions provided by Kafka Producer API in a Scala project > built with SBT. The dependency used in the project is: > {code:java} > "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} > I followed the documentation and I was expecting that transactions fail when > I call *.commitTransaction* if some problem is raised when sending a message > like it's described in the documentation: > [https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html > ] > Unfortunately, when testing this behaviour using a message larger than the > size accepted by the Kafka broker/cluster, the transactions are not working > properly. > I tested with a 3 Kafka broker cluster with 1MB message max size (default > value): > - when the message has 1MB, the transaction is aborted and an exception is > raised when calling *commitTransaction()* > - when the message is bigger than 1MB, the transaction is completed > successfully *without* the message being written. No exception is thrown. > As an example, this means that when I produce 9 messages with 1 KB and 1 > message with 1.1MB in the same transaction, the transaction is completed but > only 9 messages are written to the Kafka cluster. > I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka > cluster and Kafka Producer API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10334) Transactions not working properly
[ https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luis Araujo updated KAFKA-10334: Description: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|[http://example.com|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]. Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. was: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the documentation: [https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html ] Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. > Transactions not working properly > - > > Key: KAFKA-10334 > URL: https://issues.apache.org/jira/browse/KAFKA-10334 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Luis Araujo >Priority: Major > > I'm using transactions provided by Kafka Producer API in a Scala project > built with SBT. The dependency used in the project is: > {code:java} > "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} > I followed the documentation and I was expecting that transactions fail when > I call *.commitTransaction* if some problem is raised when sending a message > like it's described in the > [documentation|[http://example.com|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]. > Unfortunately, when testing this behaviour using a message larger than the > size accepted by the Kafka broker/cluster, the transactions are not working > properly. > I tested with a 3 Kafka broker cluster with 1MB message max size (default > value): > - when the message has 1MB, the transaction is aborted and an exception is > raised when calling *commitTransaction()* > - when the message is bigger than 1MB, the transaction is completed > successfully *without* the message being written. No exception is thrown. > As an example, this means that when I produce 9 messages with 1 KB and 1 > message with 1.1MB in the same transaction, the transaction is completed but > only 9 messages are written to the Kafka cluster. > I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka > cluster and Kafka Producer API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10334) Transactions not working properly
[ https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luis Araujo updated KAFKA-10334: Description: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]] [link title|http://example.com] Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. was: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]] Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. > Transactions not working properly > - > > Key: KAFKA-10334 > URL: https://issues.apache.org/jira/browse/KAFKA-10334 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Luis Araujo >Priority: Major > > I'm using transactions provided by Kafka Producer API in a Scala project > built with SBT. The dependency used in the project is: > {code:java} > "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} > I followed the documentation and I was expecting that transactions fail when > I call *.commitTransaction* if some problem is raised when sending a message > like it's described in the > [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]] > [link title|http://example.com] > Unfortunately, when testing this behaviour using a message larger than the > size accepted by the Kafka broker/cluster, the transactions are not working > properly. > I tested with a 3 Kafka broker cluster with 1MB message max size (default > value): > - when the message has 1MB, the transaction is aborted and an exception is > raised when calling *commitTransaction()* > - when the message is bigger than 1MB, the transaction is completed > successfully *without* the message being written. No exception is thrown. > As an example, this means that when I produce 9 messages with 1 KB and 1 > message with 1.1MB in the same transaction, the transaction is completed but > only 9 messages are written to the Kafka cluster. > I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka > cluster and Kafka Producer API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10334) Transactions not working properly
[ https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luis Araujo updated KAFKA-10334: Description: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]] Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. was: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|[http://example.com|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]. Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. > Transactions not working properly > - > > Key: KAFKA-10334 > URL: https://issues.apache.org/jira/browse/KAFKA-10334 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Luis Araujo >Priority: Major > > I'm using transactions provided by Kafka Producer API in a Scala project > built with SBT. The dependency used in the project is: > {code:java} > "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} > I followed the documentation and I was expecting that transactions fail when > I call *.commitTransaction* if some problem is raised when sending a message > like it's described in the > [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]] > Unfortunately, when testing this behaviour using a message larger than the > size accepted by the Kafka broker/cluster, the transactions are not working > properly. > I tested with a 3 Kafka broker cluster with 1MB message max size (default > value): > - when the message has 1MB, the transaction is aborted and an exception is > raised when calling *commitTransaction()* > - when the message is bigger than 1MB, the transaction is completed > successfully *without* the message being written. No exception is thrown. > As an example, this means that when I produce 9 messages with 1 KB and 1 > message with 1.1MB in the same transaction, the transaction is completed but > only 9 messages are written to the Kafka cluster. > I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka > cluster and Kafka Producer API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10334) Transactions not working properly
[ https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luis Araujo updated KAFKA-10334: Description: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]] [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. was: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]] [link title|http://example.com] Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. > Transactions not working properly > - > > Key: KAFKA-10334 > URL: https://issues.apache.org/jira/browse/KAFKA-10334 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Luis Araujo >Priority: Major > > I'm using transactions provided by Kafka Producer API in a Scala project > built with SBT. The dependency used in the project is: > {code:java} > "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} > I followed the documentation and I was expecting that transactions fail when > I call *.commitTransaction* if some problem is raised when sending a message > like it's described in the > [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]] > > [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] > Unfortunately, when testing this behaviour using a message larger than the > size accepted by the Kafka broker/cluster, the transactions are not working > properly. > I tested with a 3 Kafka broker cluster with 1MB message max size (default > value): > - when the message has 1MB, the transaction is aborted and an exception is > raised when calling *commitTransaction()* > - when the message is bigger than 1MB, the transaction is completed > successfully *without* the message being written. No exception is thrown. > As an example, this means that when I produce 9 messages with 1 KB and 1 > message with 1.1MB in the same transaction, the transaction is completed but > only 9 messages are written to the Kafka cluster. > I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka > cluster and Kafka Producer API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10334) Transactions not working properly
[ https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luis Araujo updated KAFKA-10334: Description: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation.|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. was: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]] [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. > Transactions not working properly > - > > Key: KAFKA-10334 > URL: https://issues.apache.org/jira/browse/KAFKA-10334 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Luis Araujo >Priority: Major > > I'm using transactions provided by Kafka Producer API in a Scala project > built with SBT. The dependency used in the project is: > {code:java} > "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} > I followed the documentation and I was expecting that transactions fail when > I call *.commitTransaction* if some problem is raised when sending a message > like it's described in the > [documentation.|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] > Unfortunately, when testing this behaviour using a message larger than the > size accepted by the Kafka broker/cluster, the transactions are not working > properly. > I tested with a 3 Kafka broker cluster with 1MB message max size (default > value): > - when the message has 1MB, the transaction is aborted and an exception is > raised when calling *commitTransaction()* > - when the message is bigger than 1MB, the transaction is completed > successfully *without* the message being written. No exception is thrown. > As an example, this means that when I produce 9 messages with 1 KB and 1 > message with 1.1MB in the same transaction, the transaction is completed but > only 9 messages are written to the Kafka cluster. > I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka > cluster and Kafka Producer API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10334) Transactions not working properly
[ https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luis Araujo updated KAFKA-10334: Description: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]. Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. was: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation.|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. > Transactions not working properly > - > > Key: KAFKA-10334 > URL: https://issues.apache.org/jira/browse/KAFKA-10334 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Luis Araujo >Priority: Major > > I'm using transactions provided by Kafka Producer API in a Scala project > built with SBT. The dependency used in the project is: > {code:java} > "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} > I followed the documentation and I was expecting that transactions fail when > I call *.commitTransaction* if some problem is raised when sending a message > like it's described in the > [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]. > Unfortunately, when testing this behaviour using a message larger than the > size accepted by the Kafka broker/cluster, the transactions are not working > properly. > I tested with a 3 Kafka broker cluster with 1MB message max size (default > value): > - when the message has 1MB, the transaction is aborted and an exception is > raised when calling *commitTransaction()* > - when the message is bigger than 1MB, the transaction is completed > successfully *without* the message being written. No exception is thrown. > As an example, this means that when I produce 9 messages with 1 KB and 1 > message with 1.1MB in the same transaction, the transaction is completed but > only 9 messages are written to the Kafka cluster. > I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka > cluster and Kafka Producer API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10334) Transactions not working properly
[ https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171586#comment-17171586 ] Luis Araujo commented on KAFKA-10334: - After creating this issue, I tested the await for the future to be completed and just then calling *commitTransaction* function. I will leave a similar example: {code:java} Producer producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i))).get(); producer.commitTransaction();{code} With this code, I'm getting an exception when the message is above (and not equal to) 1MB when I call *.get* in the Future. > Transactions not working properly > - > > Key: KAFKA-10334 > URL: https://issues.apache.org/jira/browse/KAFKA-10334 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Luis Araujo >Priority: Major > > I'm using transactions provided by Kafka Producer API in a Scala project > built with SBT. The dependency used in the project is: > {code:java} > "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} > I followed the documentation and I was expecting that transactions fail when > I call *.commitTransaction* if some problem is raised when sending a message > like it's described in the > [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]. > Unfortunately, when testing this behaviour using a message larger than the > size accepted by the Kafka broker/cluster, the transactions are not working > properly. > I tested with a 3 Kafka broker cluster with 1MB message max size (default > value): > - when the message has 1MB, the transaction is aborted and an exception is > raised when calling *commitTransaction()* > - when the message is bigger than 1MB, the transaction is completed > successfully *without* the message being written. No exception is thrown. > As an example, this means that when I produce 9 messages with 1 KB and 1 > message with 1.1MB in the same transaction, the transaction is completed but > only 9 messages are written to the Kafka cluster. > I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka > cluster and Kafka Producer API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10334) Transactions not working properly
[ https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luis Araujo updated KAFKA-10334: Description: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]. Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. The configs that I'm using to create the KafkaProducer in order to use transactions: {code:java} new Properties() { { put(BOOTSTRAP_SERVERS_CONFIG, "localhost:29092,localhost:29093,localhost:29094") put(ACKS_CONFIG, "-1") put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") put(KEY_SERIALIZER_CLASS_CONFIG, Class.forName(classOf[StringSerializer].getName)) put(VALUE_SERIALIZER_CLASS_CONFIG, Class.forName(classOf[ByteArraySerializer].getName)) put(CLIENT_ID_CONFIG, "app") put(TRANSACTIONAL_ID_CONFIG, "app") put(ENABLE_IDEMPOTENCE_CONFIG, "true") } } {code} was: I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: {code:java} "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} I followed the documentation and I was expecting that transactions fail when I call *.commitTransaction* if some problem is raised when sending a message like it's described in the [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]. Unfortunately, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling *commitTransaction()* - when the message is bigger than 1MB, the transaction is completed successfully *without* the message being written. No exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. > Transactions not working properly > - > > Key: KAFKA-10334 > URL: https://issues.apache.org/jira/browse/KAFKA-10334 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Luis Araujo >Priority: Major > > I'm using transactions provided by Kafka Producer API in a Scala project > built with SBT. The dependency used in the project is: > {code:java} > "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} > I followed the documentation and I was expecting that transactions fail when > I call *.commitTransaction* if some problem is raised when sending a message > like it's described in the > [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]. > Unfortunately, when testing this behaviour using a message larger than the > size accepted by the Kafka broker/cluster, the transactions are not working > properly. > I tested with a 3 Kafka broker cluster with 1MB message max size (default > value): > - when the message has 1MB, the transaction is aborted and an exception is > raised when calling *commitTransaction()* > - when
[jira] [Comment Edited] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171612#comment-17171612 ] Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:44 PM: - thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? was (Author: yangguo1220): thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171612#comment-17171612 ] Ning Zhang commented on KAFKA-10339: thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171612#comment-17171612 ] Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:47 PM: - thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets for the consumer in WorkerSinkTask as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? was (Author: yangguo1220): thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r465867406 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -369,6 +370,32 @@ class GroupCoordinator(val brokerId: Int, } } + /** + * try to complete produce, fetch and delete requests if the HW of partition is incremented. Otherwise, we try to complete + * only delayed fetch requests. + * + * Noted that this method may hold multiple group locks so the caller should NOT hold any group lock + * in order to avoid deadlock + * @param topicPartitions a map contains the partition and a flag indicting whether the HWM has been changed + */ + private[group] def completeDelayedRequests(topicPartitions: Map[TopicPartition, LeaderHwChange]): Unit = +topicPartitions.foreach { + case (tp, leaderHWIncremented) => leaderHWIncremented match { +case LeaderHwIncremented => groupManager.replicaManager.completeDelayedRequests(tp) +case _ => groupManager.replicaManager.completeDelayedFetchRequests(tp) + } +} + + /** + * complete the delayed join requests associated to input group keys. + * + * Noted that delayedJoin itself uses a lock other than the group lock for DelayedOperation.maybeTryComplete() and Review comment: @chia7712 : Yes, that's a possibility. It adds some complexity to DelayedOperation. Another possibility is to have a special case to complete the delayed requests from groupManager.storeGroup() in GroupCoordinator.onCompleteJoin() in a separate thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on pull request #8964: URL: https://github.com/apache/kafka/pull/8964#issuecomment-669308138 System test green: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4093/ Ready for a final review and merge cc @mjsax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation
dielhennr commented on a change in pull request #9101: URL: https://github.com/apache/kafka/pull/9101#discussion_r465867372 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.DynamicClientConfigUpdater; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.DescribeConfigsRequest; +import org.apache.kafka.common.requests.DescribeConfigsResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +/** + * Handles the request and response of a dynamic client configuration update for the consumer + */ +public class DynamicConsumerConfig extends DynamicClientConfigUpdater { +/* Client to use */ +private ConsumerNetworkClient client; + +/* Configs to update */ +private GroupRebalanceConfig rebalanceConfig; + +/* Object to synchronize on when response is recieved */ +Object lock; Review comment: I think I forgot to remove this after starting to use atomics to track the interval and in-progress updates. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9127: MINOR: fix HTML
guozhangwang commented on pull request #9127: URL: https://github.com/apache/kafka/pull/9127#issuecomment-669320718 LGTM! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9127: MINOR: fix HTML
guozhangwang commented on pull request #9127: URL: https://github.com/apache/kafka/pull/9127#issuecomment-669320891 The kafka-site has already been updated, so yes you do need to have a separate PR for it :P This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465886919 ## File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java ## @@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() { assertThat(sensor.hasMetrics(), is(true)); } + +@Test +public void testStrictQuotaEnforcementWithRate() { +final Time time = new MockTime(0, System.currentTimeMillis(), 0); +final Metrics metrics = new Metrics(time); +final Sensor sensor = metrics.sensor("sensor", new MetricConfig() +.quota(Quota.upperBound(2)) +.timeWindow(1, TimeUnit.SECONDS) +.samples(11)); +final MetricName metricName = metrics.metricName("rate", "test-group"); +assertTrue(sensor.add(metricName, new Rate())); +final KafkaMetric rateMetric = metrics.metric(metricName); + +// Recording a first value at T+0 to bring the avg rate to 3 which is already +// above the quota. +strictRecord(sensor, 30, time.milliseconds()); +assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1); + +// Theoretically, we should wait 5s to bring back the avg rate to the define quota: +// ((30 / 10) - 2) / 2 * 10 = 5s +time.sleep(5000); + +// But, recording a second value is rejected because the avg rate is still equal +// to 3 after 5s. +assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1); +assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 30, time.milliseconds())); + +metrics.close(); +} + +@Test +public void testStrictQuotaEnforcementWithTokenBucket() { +final Time time = new MockTime(0, System.currentTimeMillis(), 0); +final Metrics metrics = new Metrics(time); +final Sensor sensor = metrics.sensor("sensor", new MetricConfig() +.quota(Quota.upperBound(2)) +.timeWindow(1, TimeUnit.SECONDS) +.samples(11)); +final MetricName metricName = metrics.metricName("credits", "test-group"); +assertTrue(sensor.add(metricName, new TokenBucket())); +final KafkaMetric tkMetric = metrics.metric(metricName); + +// Recording a first value at T+0 to bring the remaining credits below zero +strictRecord(sensor, 30, time.milliseconds()); +assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1); + +// Theoretically, we should wait 5s to bring back the avg rate to the define quota: +// 10 / 2 = 5s +time.sleep(5000); + +// Unlike the default rate based on a windowed sum, it works as expected. +assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1); +strictRecord(sensor, 30, time.milliseconds()); +assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1); + +metrics.close(); +} + +private void strictRecord(Sensor sensor, double value, long timeMs) { +synchronized (sensor) { +sensor.checkQuotas(timeMs); Review comment: In the above two tests, I simulate a "strict quotas" in the sense that recording is not allowed if the quota is already violated. Therefore, I check it before recording the value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r46517 ## File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java ## @@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() { assertThat(sensor.hasMetrics(), is(true)); } + +@Test +public void testStrictQuotaEnforcementWithRate() { +final Time time = new MockTime(0, System.currentTimeMillis(), 0); +final Metrics metrics = new Metrics(time); +final Sensor sensor = metrics.sensor("sensor", new MetricConfig() +.quota(Quota.upperBound(2)) +.timeWindow(1, TimeUnit.SECONDS) +.samples(11)); +final MetricName metricName = metrics.metricName("rate", "test-group"); +assertTrue(sensor.add(metricName, new Rate())); +final KafkaMetric rateMetric = metrics.metric(metricName); + +// Recording a first value at T+0 to bring the avg rate to 3 which is already +// above the quota. +strictRecord(sensor, 30, time.milliseconds()); +assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1); + +// Theoretically, we should wait 5s to bring back the avg rate to the define quota: +// ((30 / 10) - 2) / 2 * 10 = 5s +time.sleep(5000); + +// But, recording a second value is rejected because the avg rate is still equal +// to 3 after 5s. +assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1); +assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 30, time.milliseconds())); + +metrics.close(); +} + +@Test +public void testStrictQuotaEnforcementWithTokenBucket() { +final Time time = new MockTime(0, System.currentTimeMillis(), 0); +final Metrics metrics = new Metrics(time); +final Sensor sensor = metrics.sensor("sensor", new MetricConfig() +.quota(Quota.upperBound(2)) +.timeWindow(1, TimeUnit.SECONDS) +.samples(11)); +final MetricName metricName = metrics.metricName("credits", "test-group"); +assertTrue(sensor.add(metricName, new TokenBucket())); +final KafkaMetric tkMetric = metrics.metric(metricName); + +// Recording a first value at T+0 to bring the remaining credits below zero +strictRecord(sensor, 30, time.milliseconds()); +assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1); + +// Theoretically, we should wait 5s to bring back the avg rate to the define quota: +// 10 / 2 = 5s +time.sleep(5000); + +// Unlike the default rate based on a windowed sum, it works as expected. +assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1); +strictRecord(sensor, 30, time.milliseconds()); +assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1); + +metrics.close(); +} + +private void strictRecord(Sensor sensor, double value, long timeMs) { +synchronized (sensor) { +sensor.checkQuotas(timeMs); +sensor.record(value, timeMs, false); +} +} + +@Test +public void testRecordAndCheckQuotaUseMetricConfigOfEachStat() { +final Time time = new MockTime(0, System.currentTimeMillis(), 0); +final Metrics metrics = new Metrics(time); +final Sensor sensor = metrics.sensor("sensor"); + +final MeasurableStat stat1 = Mockito.mock(MeasurableStat.class); +final MetricName stat1Name = metrics.metricName("stat1", "test-group"); +final MetricConfig stat1Config = new MetricConfig().quota(Quota.upperBound(5)); +sensor.add(stat1Name, stat1, stat1Config); + +final MeasurableStat stat2 = Mockito.mock(MeasurableStat.class); +final MetricName stat2Name = metrics.metricName("stat2", "test-group"); +final MetricConfig stat2Config = new MetricConfig().quota(Quota.upperBound(10)); +sensor.add(stat2Name, stat2, stat2Config); + +sensor.record(10, 1); +Mockito.verify(stat1).record(stat1Config, 10, 1); +Mockito.verify(stat2).record(stat2Config, 10, 1); + +Mockito.when(stat1.measure(stat1Config, 2)).thenReturn(2.0); +Mockito.when(stat2.measure(stat2Config, 2)).thenReturn(2.0); +sensor.checkQuotas(2); Review comment: No. This test actually reproduce a bug that I have found. Basically, a stat can be added to the Sensor with a MetricsConfig but the Sensor was not using the provided one when recording a value but was using the one of the Sensor all the time. This test verifies that the correct config is used both for recording and measuring via calling checkQuota. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and us
[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465889286 ## File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala ## @@ -156,6 +174,24 @@ class ControllerMutationQuotaManager(private val config: ClientQuotaManagerConfi quotaMetricTags.asJava) } + protected def clientTokenBucketMetricName(quotaMetricTags: Map[String, String]): MetricName = { Review comment: Yes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465889447 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.MetricConfig; + +public class TokenBucket implements MeasurableStat { +private final TimeUnit unit; +private double tokens; +private long lastUpdateMs; + +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +public TokenBucket(TimeUnit unit) { +this.unit = unit; +this.tokens = 0; +this.lastUpdateMs = 0; +} + +@Override +public double measure(final MetricConfig config, final long timeMs) { +if (config.quota() == null) +return Long.MAX_VALUE; +final double quota = config.quota().bound(); +final double burst = burst(config); +refill(quota, burst, timeMs); +return this.tokens; +} + +@Override +public void record(final MetricConfig config, final double value, final long timeMs) { +if (config.quota() == null) +return; +final double quota = config.quota().bound(); +final double burst = burst(config); +refill(quota, burst, timeMs); +this.tokens = Math.min(burst, this.tokens - value); +} + +private void refill(final double quota, final double burst, final long timeMs) { +this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - lastUpdateMs)); +this.lastUpdateMs = timeMs; +} + +private double burst(final MetricConfig config) { +return (config.samples() - 1) * convert(config.timeWindowMs()) * config.quota().bound(); +} + +private double convert(final long timeMs) { +switch (unit) { Review comment: Sure, makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465889824 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.MetricConfig; + +public class TokenBucket implements MeasurableStat { +private final TimeUnit unit; +private double tokens; +private long lastUpdateMs; + +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +public TokenBucket(TimeUnit unit) { +this.unit = unit; +this.tokens = 0; +this.lastUpdateMs = 0; +} + +@Override +public double measure(final MetricConfig config, final long timeMs) { +if (config.quota() == null) +return Long.MAX_VALUE; +final double quota = config.quota().bound(); +final double burst = burst(config); +refill(quota, burst, timeMs); +return this.tokens; +} + +@Override +public void record(final MetricConfig config, final double value, final long timeMs) { +if (config.quota() == null) +return; +final double quota = config.quota().bound(); +final double burst = burst(config); +refill(quota, burst, timeMs); +this.tokens = Math.min(burst, this.tokens - value); +} + +private void refill(final double quota, final double burst, final long timeMs) { +this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - lastUpdateMs)); +this.lastUpdateMs = timeMs; +} + +private double burst(final MetricConfig config) { +return (config.samples() - 1) * convert(config.timeWindowMs()) * config.quota().bound(); Review comment: Ack. I misunderstood it. `config.samples()` sounds good to me. ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.MetricConfig; + +public class TokenBucket implements MeasurableStat { Review comment: Sure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine closed pull request #8513: MINOR: jackson 2.10.3
kkonstantine closed pull request #8513: URL: https://github.com/apache/kafka/pull/8513 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171642#comment-17171642 ] Ryanne Dolan commented on KAFKA-10339: -- I believe your proposed task.loadOffsets() is taken care of by the existing SinkTaskContext.offsets() method actually. This mechanism is used in other SinkTasks where the offsets are stored in the downstream system. I think we may already have all the interfaces required to make this work. > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on pull request #8513: MINOR: jackson 2.10.3
kkonstantine commented on pull request #8513: URL: https://github.com/apache/kafka/pull/8513#issuecomment-669328325 Closing here in favor of https://github.com/apache/kafka/pull/9058 Thanks for suggesting the version upgrade @sullis This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-669330984 @chia7712 : The following is my thought after thinking about this a bit more. The changes that we made in DelayedJoin is complicated and it still doesn't completely solve the deadlock issue. Adding more complexity there to solve the issue is probably not ideal. As a compromise, I was thinking another approach. We could pass down a flag to ReplicaManager.appendRecords() to complete the delayed requests in a separate thread there. Only GroupCoordinator will set this flag. So, the background completeness check is limited to the offset_commit topic. Since this topic is low volume, a single thread is likely enough to handle the load. So, we don't have to make the number of thread configurable and don't have to worry about this thread being overwhelmed. The benefit of this approach is that the code is probably easier to understand since we could keep most existing logic in GroupCoordinator unchanged. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10162) Use Token Bucket algorithm for controller mutation quota
[ https://issues.apache.org/jira/browse/KAFKA-10162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-10162: Summary: Use Token Bucket algorithm for controller mutation quota (was: Update Rate implementation to cope with spiky workload) > Use Token Bucket algorithm for controller mutation quota > > > Key: KAFKA-10162 > URL: https://issues.apache.org/jira/browse/KAFKA-10162 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10364) Use Token Bucket for all quotas
David Jacot created KAFKA-10364: --- Summary: Use Token Bucket for all quotas Key: KAFKA-10364 URL: https://issues.apache.org/jira/browse/KAFKA-10364 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on pull request #9114: URL: https://github.com/apache/kafka/pull/9114#issuecomment-669343564 @junrao Thanks for your comments. I just pushed an update which addressed them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah edited a comment on pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded
mumrah edited a comment on pull request #7222: URL: https://github.com/apache/kafka/pull/7222#issuecomment-669345762  Failed run https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1860/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded
mumrah commented on pull request #7222: URL: https://github.com/apache/kafka/pull/7222#issuecomment-669345762  This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded
mumrah commented on pull request #7222: URL: https://github.com/apache/kafka/pull/7222#issuecomment-669345934 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation
dielhennr commented on a change in pull request #9101: URL: https://github.com/apache/kafka/pull/9101#discussion_r465907401 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.DynamicClientConfigUpdater; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.DescribeConfigsRequest; +import org.apache.kafka.common.requests.DescribeConfigsResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +/** + * Handles the request and response of a dynamic client configuration update for the consumer + */ +public class DynamicConsumerConfig extends DynamicClientConfigUpdater { +/* Client to use */ +private ConsumerNetworkClient client; + +/* Configs to update */ +private GroupRebalanceConfig rebalanceConfig; + +/* Object to synchronize on when response is recieved */ +Object lock; + +/* Logger to use */ +private Logger log; + +/* The resource name to use when constructing a DescribeConfigsRequest */ +private final String clientId; + +/* Dynamic Configs recieved from the previous DescribeConfigsResponse */ +private Map previousDynamicConfigs; + +/* Indicates if we have recieved the initial dynamic configurations */ +private boolean initialConfigsFetched; + +public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) { +super(time); +this.rebalanceConfig = config; +this.log = logContext.logger(DynamicConsumerConfig.class); +this.client = client; +this.lock = lock; +this.clientId = rebalanceConfig.clientId; +this.previousDynamicConfigs = new HashMap<>(); +this.initialConfigsFetched = false; +} + +/** + * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations + * + * @return {@link RequestFuture} + */ +public RequestFuture maybeFetchInitialConfigs() { +if (!initialConfigsFetched) { +Node node = null; +while (node == null) { +node = client.leastLoadedNode(); Review comment: The main thread is calling the method and the main thread is polling. This was a hack to try to force the initial describe configs before the join group request. This returns non-null when there is a connected node, connecting node, or node with no connection. It may be best to just fetch the configs and poll for them if the node is non-null. If it is null then maybe skip the first synchronous DescribeConfigs RPC and fetch the configs asynchronously on the next try. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig
dajac commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r465908873 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -2982,12 +3089,33 @@ class KafkaApis(val requestChannel: RequestChannel, logIfDenied: Boolean = true, refCount: Int = 1): Boolean = { authorizer.forall { authZ => - val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) - val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied)) - authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED + if (authorizeAction(requestContext, operation, +resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)) { +true + } else { +operation match { + case ALTER | ALTER_CONFIGS | CREATE | DELETE => +requestContext.maybeFromControlPlane && + authorizeAction(requestContext, CLUSTER_ACTION, +resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ) Review comment: I wonder if this is correct. Usually, we use `CLUSTER_ACTION` action with the `CLUSTER` resource. For instance, this is how we authorize control requests: ``` authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME) ``` I thought that we would do the same in this case. Don't we? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171670#comment-17171670 ] Ning Zhang commented on KAFKA-10339: it is true that SinkTaskContext.offsets() has been taken covered. I will test this updated idea out in my local and the extra step is to create a "fake" consumer group on target cluster. > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
junrao commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465921001 ## File path: clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.stats.TokenBucket; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.Before; +import org.junit.Test; + +public class TokenBucketTest { +Time time; + +@Before +public void setup() { +time = new MockTime(0, System.currentTimeMillis(), System.nanoTime()); +} + +@Test +public void testRecord() { +// Rate = 5 unit / sec +// Burst = 2 * (11 - 1) = 20 units +MetricConfig config = new MetricConfig() +.quota(Quota.upperBound(5)) +.timeWindow(2, TimeUnit.SECONDS) +.samples(10); + +TokenBucket tk = new TokenBucket(); + +// Expect 100 credits at T +assertEquals(100, tk.measure(config, time.milliseconds()), 0.1); + +// Record 60 at T, expect 13 credits +tk.record(config, 60, time.milliseconds()); +assertEquals(40, tk.measure(config, time.milliseconds()), 0.1); + +// Advance by 2s, record 5, expect 45 credits +time.sleep(2000); +tk.record(config, 5, time.milliseconds()); +assertEquals(45, tk.measure(config, time.milliseconds()), 0.1); + +// Advance by 2s, record 60, expect -5 credits +time.sleep(2000); +tk.record(config, 60, time.milliseconds()); +assertEquals(-5, tk.measure(config, time.milliseconds()), 0.1); +} + +@Test +public void testUnrecord() { +// Rate = 5 unit / sec +// Burst = 2 * (11 - 1) = 20 units Review comment: we have 10 samples now. ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Quota; + +import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert; + +/** + * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm + * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}. + * + * The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or + * the maximum number of credits of the bucket is defined by + * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}. + * + * The quota is considered as exhausted when the amount of remaining credits in the bucket Review comment: Could we document how this quota behaves differently from existing quota? ## File path: clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional informatio
[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
junrao commented on pull request #9114: URL: https://github.com/apache/kafka/pull/9114#issuecomment-669378432 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #9110: MINOR: Ensure a reason is logged for all segment deletion operations
dhruvilshah3 commented on a change in pull request #9110: URL: https://github.com/apache/kafka/pull/9110#discussion_r465939741 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2227,14 +2210,17 @@ class Log(@volatile private var _dir: File, * @param segments The log segments to schedule for deletion * @param asyncDelete Whether the segment files should be deleted asynchronously */ - private def removeAndDeleteSegments(segments: Iterable[LogSegment], asyncDelete: Boolean): Unit = { + private def removeAndDeleteSegments(segments: Iterable[LogSegment], + asyncDelete: Boolean, + reason: SegmentDeletionReason): Unit = { if (segments.nonEmpty) { lock synchronized { // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by // removing the deleted segment, we should force materialization of the iterator here, so that results of the // iteration remain valid and deterministic. val toDelete = segments.toList toDelete.foreach { segment => + info(s"${reason.reasonString(this, segment)}") Review comment: I think this is reasonable. Logging a segment per line will make it easier for us to diagnose issues. I made the change to log a segment per line for retention-related deletions. We still batch all segments in a single line for all other deletion events, eg. log deletion, truncation, etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation
dielhennr commented on a change in pull request #9101: URL: https://github.com/apache/kafka/pull/9101#discussion_r465907401 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.DynamicClientConfigUpdater; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.DescribeConfigsRequest; +import org.apache.kafka.common.requests.DescribeConfigsResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +/** + * Handles the request and response of a dynamic client configuration update for the consumer + */ +public class DynamicConsumerConfig extends DynamicClientConfigUpdater { +/* Client to use */ +private ConsumerNetworkClient client; + +/* Configs to update */ +private GroupRebalanceConfig rebalanceConfig; + +/* Object to synchronize on when response is recieved */ +Object lock; + +/* Logger to use */ +private Logger log; + +/* The resource name to use when constructing a DescribeConfigsRequest */ +private final String clientId; + +/* Dynamic Configs recieved from the previous DescribeConfigsResponse */ +private Map previousDynamicConfigs; + +/* Indicates if we have recieved the initial dynamic configurations */ +private boolean initialConfigsFetched; + +public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) { +super(time); +this.rebalanceConfig = config; +this.log = logContext.logger(DynamicConsumerConfig.class); +this.client = client; +this.lock = lock; +this.clientId = rebalanceConfig.clientId; +this.previousDynamicConfigs = new HashMap<>(); +this.initialConfigsFetched = false; +} + +/** + * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations + * + * @return {@link RequestFuture} + */ +public RequestFuture maybeFetchInitialConfigs() { +if (!initialConfigsFetched) { +Node node = null; +while (node == null) { +node = client.leastLoadedNode(); Review comment: The main thread is calling the method and the main thread is polling. This was a hack to try to force the initial describe configs before the join group request. This returns non-null when there is a connected node, connecting node, or node with no connection. It may be best to just fetch the configs and poll for them if the node is non-null and ready to send a request to. If it is null or not ready then maybe skip the first synchronous DescribeConfigs RPC and fetch the configs asynchronously on the next try. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rajinisivaram commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r465952166 ## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ## @@ -508,7 +563,15 @@ object ConfigCommand extends Config { val entityStr = (entitySubstr(ClientQuotaEntity.USER) ++ entitySubstr(ClientQuotaEntity.CLIENT_ID)).mkString(", ") val entriesStr = entries.asScala.map(e => s"${e._1}=${e._2}").mkString(", ") - println(s"Configs for ${entityStr} are ${entriesStr}") + println(s"Quota configs for ${entityStr} are ${entriesStr}") +} +// we describe user SCRAM credentials only when we are not describing client information +// and we are not given either --entity-default or --user-defaults +if (!entityTypes.contains(ConfigType.Client) && !entityNames.contains("")) { + getUserScramCredentialConfigs(adminClient, entityNames).foreach { case (user, description) => Review comment: What do we do if the user is not a SCRAM user? Won't this throw an exception? Can we make sure that user without quota or SCRAM credential doesn't print any errors or exceptions? ## File path: core/src/main/scala/kafka/server/AdminManager.scala ## @@ -980,4 +984,137 @@ class AdminManager(val config: KafkaConfig, entry.entity -> apiError }.toMap } + + def describeUserScramCredentials(users: Option[Seq[String]]): DescribeUserScramCredentialsResponseData = { +val retval = new DescribeUserScramCredentialsResponseData() + +def addToResults(user: String, userConfig: Properties) = { + val configKeys = userConfig.stringPropertyNames + val hasScramCredential = !ScramMechanism.values().toList.filter(key => key != ScramMechanism.UNKNOWN && configKeys.contains(key.getMechanismName)).isEmpty + if (hasScramCredential) { +val userScramCredentials = new UserScramCredential().setName(user) +ScramMechanism.values().foreach(mechanism => if (mechanism != ScramMechanism.UNKNOWN) { + val propertyValue = userConfig.getProperty(mechanism.getMechanismName) + if (propertyValue != null) { +val iterations = ScramCredentialUtils.credentialFromString(propertyValue).iterations +userScramCredentials.credentialInfos.add(new CredentialInfo().setMechanism(mechanism.getType).setIterations(iterations)) + } +}) +retval.userScramCredentials.add(userScramCredentials) + } +} + +if (!users.isDefined || users.get.isEmpty) + // describe all users + adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach { case (user, properties) => addToResults(user, properties) } +else { + // describe specific users + // https://stackoverflow.com/questions/24729544/how-to-find-duplicates-in-a-list + val duplicatedUsers = users.get.groupBy(identity).collect { case (x, Seq(_, _, _*)) => x } + if (duplicatedUsers.nonEmpty) { +retval.setError(Errors.INVALID_REQUEST.code()) +retval.setErrorMessage(s"Cannot describe SCRAM credentials for the same user twice in a single request: ${duplicatedUsers.mkString("[", ", ", "]")}") + } else +users.get.foreach { user => addToResults(user, adminZkClient.fetchEntityConfig(ConfigType.User, Sanitizer.sanitize(user))) } Review comment: should we catch exception? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { +@Override +public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { +return new DescribeUserScramCredentialsRequest.Builder( +new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(; +} + +@Override +public void handleResponse(AbstractResponse abstractResponse) { +DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; +Errors error = Errors.forCode(response.data().error()); +switch (error) { +case NONE: +DescribeUserScramCredentialsResponseData data = respons
[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465968369 ## File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java ## @@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() { assertThat(sensor.hasMetrics(), is(true)); } + +@Test +public void testStrictQuotaEnforcementWithRate() { +final Time time = new MockTime(0, System.currentTimeMillis(), 0); +final Metrics metrics = new Metrics(time); +final Sensor sensor = metrics.sensor("sensor", new MetricConfig() +.quota(Quota.upperBound(2)) +.timeWindow(1, TimeUnit.SECONDS) +.samples(11)); Review comment: The config is correct. 11 samples. With the few samples in the test, the total window is actually 10s. This is why I use 10 in the formulas. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on pull request #9114: URL: https://github.com/apache/kafka/pull/9114#issuecomment-669465274 @junrao Thanks. I have updated the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
guozhangwang commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r465923769 ## File path: docs/streams/developer-guide/config-streams.html ## @@ -326,13 +321,18 @@ bootstrap.serversstate.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. -60 milliseconds +60 milliseconds (10 minutes) state.dir High Directory location for state stores. /tmp/kafka-streams + task.timeout.ms +Medium Review comment: "medium" looks fine to me. But I'm not feeling strong against it either. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9126: MINOR: Code cleanup in StreamsResetter
mjsax merged pull request #9126: URL: https://github.com/apache/kafka/pull/9126 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
junrao commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465989800 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Quota; + +import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert; + +/** + * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm + * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}. + * + * The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or + * the maximum number of credits of the bucket is defined by + * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}. + * + * The quota is considered as exhausted when the amount of remaining credits in the bucket + * is below zero. The enforcement is done by the {@link org.apache.kafka.common.metrics.Sensor}. + * + * Token Bucket vs Rate based Quota: + * The current sampled rate based quota does not cope well with bursty workloads. The issue is + * that a unique and large sample can hold the average above the quota and this until it is Review comment: "this until it is" doesn't quite parse. ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Quota; + +import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert; + +/** + * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm + * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}. + * + * The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or + * the maximum number of credits of the bucket is defined by + * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}. + * + * The quota is considered as exhausted when the amount of remaining credits in the bucket + * is below zero. The enforcement is done by the {@link org.apache.kafka.common.metrics.Sensor}. + * + * Token Bucket vs Rate based Quota: + * The current sampled rate based quota does not cope well with bursty workloads. The issue is + * that a unique and large sample can hold the average above the quota and this until it is + * discarded. Practically, when this happens, one must wait until the sample is expired to + * bring the rate below the quota even though less time would be theoretically required. As an + * examples, let's imagine that we have: + * - Quota (Q) = 5 + * - Samples (S) = 100 + * - Window (W) = 1s + * A burst of 560 brings the average rate (R) to 5.6 (560 / 100). The throttle time is computed as Review comment: "The throttle time " : I guess this is the expected throttle time?
[GitHub] [kafka] mjsax commented on pull request #9127: MINOR: fix HTML
mjsax commented on pull request #9127: URL: https://github.com/apache/kafka/pull/9127#issuecomment-669498317 Found one more tiny issue. Will merge if no objections raise. kafka-site PR: https://github.com/apache/kafka-site/pull/283 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
junrao commented on pull request #9114: URL: https://github.com/apache/kafka/pull/9114#issuecomment-669499409 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
junrao commented on pull request #9114: URL: https://github.com/apache/kafka/pull/9114#issuecomment-669499767 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
junrao commented on pull request #9114: URL: https://github.com/apache/kafka/pull/9114#issuecomment-669500012 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9067: MINOR: Streams integration tests should not call exit
mjsax merged pull request #9067: URL: https://github.com/apache/kafka/pull/9067 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465997810 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Quota; + +import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert; + +/** + * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm + * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}. + * + * The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or + * the maximum number of credits of the bucket is defined by + * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}. + * + * The quota is considered as exhausted when the amount of remaining credits in the bucket + * is below zero. The enforcement is done by the {@link org.apache.kafka.common.metrics.Sensor}. + * + * Token Bucket vs Rate based Quota: + * The current sampled rate based quota does not cope well with bursty workloads. The issue is + * that a unique and large sample can hold the average above the quota and this until it is + * discarded. Practically, when this happens, one must wait until the sample is expired to + * bring the rate below the quota even though less time would be theoretically required. As an + * examples, let's imagine that we have: + * - Quota (Q) = 5 + * - Samples (S) = 100 + * - Window (W) = 1s + * A burst of 560 brings the average rate (R) to 5.6 (560 / 100). The throttle time is computed as Review comment: Yes, that is correct. I have added "expected" to be clearer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465997624 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Quota; + +import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert; + +/** + * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm + * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}. + * + * The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or + * the maximum number of credits of the bucket is defined by + * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}. + * + * The quota is considered as exhausted when the amount of remaining credits in the bucket + * is below zero. The enforcement is done by the {@link org.apache.kafka.common.metrics.Sensor}. + * + * Token Bucket vs Rate based Quota: + * The current sampled rate based quota does not cope well with bursty workloads. The issue is + * that a unique and large sample can hold the average above the quota and this until it is Review comment: i have removed the "and this". Does it parse better? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
abbccdda commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r465999685 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -373,8 +373,15 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr final Set statefulTasks = new HashSet<>(); -final boolean probingRebalanceNeeded = -assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks); +final boolean probingRebalanceNeeded; +try { +probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks); +} catch (final TaskAssignmentException | TimeoutException e) { +return new GroupAssignment( Review comment: Since we could throw different exceptions here, would be good to add a log to indicate which type of exception is thrown. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9067: MINOR: Streams integration tests should not call exit
mjsax commented on pull request #9067: URL: https://github.com/apache/kafka/pull/9067#issuecomment-669509500 Merged to `trunk` and cherry-picked to `2.6` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
junrao commented on pull request #9114: URL: https://github.com/apache/kafka/pull/9114#issuecomment-669510023 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
junrao commented on pull request #9114: URL: https://github.com/apache/kafka/pull/9114#issuecomment-669509903 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)
dajac commented on pull request #9114: URL: https://github.com/apache/kafka/pull/9114#issuecomment-669512698 Sure. I will update the KIP and the mailing list tomorrow. Thanks, Jun! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
mjsax commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r466006589 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -373,8 +373,15 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr final Set statefulTasks = new HashSet<>(); -final boolean probingRebalanceNeeded = -assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks); +final boolean probingRebalanceNeeded; +try { +probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks); +} catch (final TaskAssignmentException | TimeoutException e) { +return new GroupAssignment( Review comment: There are already corresponding log.error statement before those exceptions are thrown. No need to double log IMHO? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax merged pull request #9047: URL: https://github.com/apache/kafka/pull/9047 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10365) Cannot delete topic in Windows
Nilesh Balu Jorwar created KAFKA-10365: -- Summary: Cannot delete topic in Windows Key: KAFKA-10365 URL: https://issues.apache.org/jira/browse/KAFKA-10365 Project: Kafka Issue Type: Bug Reporter: Nilesh Balu Jorwar Even after checked out latest kafka source code and built. When ran zookeeper and server, deleted the topic created, server fails and gives below error: *Suppressed: java.nio.file.AccessDeniedException: E:\nil\logs\kf-logs\javatechie2-0 -> E:\nil\logs\kf-logs\javatechie2-0.793787296b094bb69b4fa3135584a6f4-delete at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83) at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301) at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) at java.nio.file.Files.move(Files.java:1395) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:911) ... 14 more [2020-08-05 16:12:30,383] WARN [ReplicaManager broker=0] Stopping serving replicas in dir E:\nil\logs\kf-logs (kafka.server.ReplicaManager) [2020-08-05 16:12:30,515] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(javatechie3-0, javatechie-0) (kafka.server.ReplicaFetcherManager) [2020-08-05 16:12:30,517] INFO [ReplicaAlterLogDirsManager on broker 0] Removed fetcher for partitions Set(javatechie3-0, javatechie-0) (kafka.server.ReplicaAlterLogDirsManager) [2020-08-05 16:12:30,525] WARN [ReplicaManager broker=0] Broker 0 stopped fetcher for partitions javatechie3-0,javatechie-0 and stopped moving logs for partitions because they are in the failed log directory E:\nil\logs\kf-logs. (kafka.server.ReplicaManager) [2020-08-05 16:12:30,527] WARN Stopping serving logs in dir E:\nil\logs\kf-logs (kafka.log.LogManager) [2020-08-05 16:12:30,535] ERROR Shutdown broker because all log dirs in E:\nil\logs\kf-logs have failed (kafka.log.LogManager) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9120: KAFKA-10316: Consider renaming getter method for Interactive Queries
mjsax commented on pull request #9120: URL: https://github.com/apache/kafka/pull/9120#issuecomment-669529543 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9120: KAFKA-10316: Consider renaming getter method for Interactive Queries
mjsax commented on pull request #9120: URL: https://github.com/apache/kafka/pull/9120#issuecomment-669530846 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org