[GitHub] [kafka] chia7712 opened a new pull request #9128: KAFKA-7540 reduce session timeout to evict dead member in time and so…

2020-08-05 Thread GitBox


chia7712 opened a new pull request #9128:
URL: https://github.com/apache/kafka/pull/9128


   issue: https://issues.apache.org/jira/browse/KAFKA-7540
   
   In short, the LEAVE_GROUP is sent to a shutdown broker so the dead member 
which is left in the group obstructs broker from completing rebalance.
   
   The test case creates two consumers (with different group) and then shutdown 
their coordinator (broker). After first coordinator is shutdown, the heartbeat 
thread of first consumer gets timeout so it sends LEAVE_GROUP request to “next” 
coordinator. Unfortunately, the LEAVE_GROUP request can’t be processed 
successfully if the ”next” coordinator is the coordinator of second consumer 
(yep, it is shutdown also). 
   
   The simple approach is that we can reduce the session timeout so coordinator 
can evict dead member in time.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9128: KAFKA-7540 reduce session timeout to evict dead member in time and so…

2020-08-05 Thread GitBox


chia7712 commented on pull request #9128:
URL: https://github.com/apache/kafka/pull/9128#issuecomment-669020702


   I have looped the test 40 times. all pass



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-08-05 Thread GitBox


omkreddy commented on pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#issuecomment-669040004


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r465565925



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463934710



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,179 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of it’s own 
supported features in its
+   * own BrokerIdZnode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in ZK in the cluster-wide 
common FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the one and 
only entity modifying
+   * the information about finalized features and their version levels.
+   *
+   * This method sets up the FeatureZNode with enabled status. This status 
means the feature
+   * versioning system (KIP-584) is enabled, and, the finalized features 
stored in the FeatureZNode
+   * are active. This status should be written by the controller to the 
FeatureZNode only when the
+   * broker IBP config is greater than or equal to KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *Broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the Broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. The reason to do this is that enabling all the possible 
features immediately after
+   *an upgrade could be harmful to the cluster.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, then it
+   *will react by creating a FeatureZNode with disabled status and 
empty finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediat

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r465570359



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2945,6 +2948,130 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {

Review comment:
   This does not seem to be required, since it is already achieved via 
`UpdateFeaturesTest`. Infact there we test using admin client, which is even 
better as it tests e2e client to server functionality.
   
   What do we gain by adding the additional tests in `KafkaApisTest` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r465572011



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
   I don't see that we consistently use a top level error code, so I will 
leave it as it is.

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. This operation is not 
transactional so it
+ * may succeed for some features while fail for others.
+ * 
+ * The API takes in a map of finalized feature name to {@link 
FeatureUpdate} that needs to be
+ * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+ * deleted, along with the new max feature version level value. This 
request is issued only to
+ * the controller since the API is only served by the controller. The 
return value contains an
+ * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+ * succeeded or failed in the controller.
+ * 
+ * Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+ * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+ * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+ * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+ * controller if it is deemed impossible.
+ * Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+ * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+ * the max version level to be less than 1.
+ * 
+ *
+ * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+ * obtained from the returned {@link UpdateFeaturesResult}:
+ * 
+ *   {@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+ *   If the authenticated user didn't have alter access to the 
cluster.
+ *   {@link org.apache.kafka.common.errors.InvalidRequestException}

Review comment:
   Answered below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r465572011



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
   I don't see that we consistently use a top level error code, so I will 
leave it as it is. It feels OK for this api to not use it as it makes little 
difference.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r465572011



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
   I don't see that we consistently use a top level error code across other 
Kafka apis, so I will leave it as it is. It feels OK for this api to not use 
it, as it does not make a significant difference.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Issue Comment Deleted] (KAFKA-10095) In LogCleanerManagerTest replace get().nonEmpty call with contains

2020-08-05 Thread Thomas Reuhl (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Reuhl updated KAFKA-10095:
-
Comment: was deleted

(was: {{I'm a newbie to kafka and would like to pick up the issue. One idea 
would be to change the code to actually checking that the topicPartition is 
_contained_ in the cleaner checkpoints:}}
{code:java}
cleanerManager.setCleaningState(topicPartition, LogCleaningInProgress)
cleanerManager.doneCleaning(topicPartition, log.dir, 1)
assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition))

cleanerManager.setCleaningState(topicPartition, LogCleaningAborted)
cleanerManager.doneCleaning(topicPartition, log.dir, 1)
assertEquals(LogCleaningPaused(1), 
cleanerManager.cleaningState(topicPartition).get)
assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
{code}
 )

> In LogCleanerManagerTest replace get().nonEmpty call with contains
> --
>
> Key: KAFKA-10095
> URL: https://issues.apache.org/jira/browse/KAFKA-10095
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner, unit tests
>Reporter: Jakob Homan
>Assignee: Sarah Gonsalves
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> In kafka.log.LogCleanerManagerTest we have two calls to 
> .get(something).nonEmpty, which is equivalent to .contains(something).  We 
> should simplify these calls.
>  {code}cleanerManager.doneCleaning(topicPartition, log.dir, 1)
> assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
> 
> assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
> cleanerManager.setCleaningState(topicPartition, LogCleaningAborted)
> cleanerManager.doneCleaning(topicPartition, log.dir, 1)
> assertEquals(LogCleaningPaused(1), 
> cleanerManager.cleaningState(topicPartition).get)
> 
> assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10363) Broker try to connect to a new cluster when there are changes in zookeeper.connect properties

2020-08-05 Thread Alexey Kornev (Jira)
Alexey Kornev created KAFKA-10363:
-

 Summary: Broker try to connect to a new cluster when there are 
changes in zookeeper.connect properties
 Key: KAFKA-10363
 URL: https://issues.apache.org/jira/browse/KAFKA-10363
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.3.1, 2.4.0
 Environment: 3 Kafka brokers (v2.3.1, v2.4.0) with Zookeeper cluster 
(3.4.10)
Ubuntu 18.04 LTS
Reporter: Alexey Kornev


We've just successfully set up a Kafka cluster consists of 3 brokers and faced 
with the following issue: when we change order of zookeeper servers in 
zookeeper.connect property in server.properties files and restart Kafka broker 
then this Kafka broker tries to connect to a new Kafka cluster. As a result, 
Kafka broker throws an error and shutdown. 

For example, config server.properties on first broker:
{code:java}
broker.id=-1
...
zookeeper.connect=node_1:2181/kafka,node_2:2181/kafka,node_3:2181/kafka
{code}
 

 

 We changed it to 

 
{code:java}
broker.id=-1
...
zookeeper.connect=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka
{code}
 

and restart Kafka broker. 

Logs:

{code:java}
[2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:55,658] INFO [ExpirationReaper-0-topic]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:57,070] INFO Registered kafka:type=kafka.Log4jController MBean 
(kafka.utils.Log4jControllerRegistration$)[2020-08-05 09:07:57,656] INFO 
Registered signal handlers for TERM, INT, HUP 
(org.apache.kafka.common.utils.LoggingSignalHandler)[2020-08-05 09:07:57,657] 
INFO starting (kafka.server.KafkaServer)[2020-08-05 09:07:57,658] INFO 
Connecting to zookeeper on 
node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
(kafka.server.KafkaServer)[2020-08-05 09:07:57,685] INFO [ZooKeeperClient Kafka 
server] Initializing a new session to node_2:2181. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,690] INFO Client 
environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, 
built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,693] INFO Client environment:host.name=localhost 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client 
environment:java.version=11.0.8 (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,696] INFO Client environment:java.vendor=Ubuntu 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
environment:java.class.path=/opt/kafka/current/bin/../libs/activation-1.1.1.jar:/opt/kafka/current/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/current/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/current/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/current/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/current/bin/../libs/connect-api-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-basic-auth-extension-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-file-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-json-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-runtime-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-transforms-2.3.1.jar:/opt/kafka/current/bin/../libs/guava-20.0.jar:/opt/kafka/current/bin/../libs/hk2-api-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-locator-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-utils-2.5.0.jar:/opt/kafka/current/bin/../libs/jackson-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-core-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-databind-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-scala_2.12-2.10.0.jar:/opt/kafka/current/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/kafka/current/bin/../libs/jakarta.annotation-api-1.3.4.jar:/opt/kafka/current/bin/../libs/jakarta.inject-2.5.0.jar:/opt/kafka/current/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/opt/kafka/current/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/kafka/current/bin/../libs/javassist-3.22.0-CR2.jar:/opt/kafka/current/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/current/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/current/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/current/bin/../libs/jersey-client-2.28.jar:/opt/kafka/

[jira] [Updated] (KAFKA-10363) Broker try to connect to a new cluster when there are changes in zookeeper.connect properties

2020-08-05 Thread Alexey Kornev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kornev updated KAFKA-10363:
--
Description: 
We've just successfully set up a Kafka cluster consists of 3 brokers and faced 
with the following issue: when we change order of zookeeper servers in 
zookeeper.connect property in server.properties files and restart Kafka broker 
then this Kafka broker tries to connect to a new Kafka cluster. As a result, 
Kafka broker throws an error and shutdown. 

For example, config server.properties on first broker:
{code:java}
broker.id=-1
...
zookeeper.connect=node_1:2181/kafka,node_2:2181/kafka,node_3:2181/kafka
{code}
 We changed it to 
{code:java}
broker.id=-1
...
zookeeper.connect=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka {code}
and restart Kafka broker. 

Logs:
{code:java}
[2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:55,658] INFO [ExpirationReaper-0-topic]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
09:07:57,070] INFO Registered kafka:type=kafka.Log4jController MBean 
(kafka.utils.Log4jControllerRegistration$)[2020-08-05 09:07:57,656] INFO 
Registered signal handlers for TERM, INT, HUP 
(org.apache.kafka.common.utils.LoggingSignalHandler)[2020-08-05 09:07:57,657] 
INFO starting (kafka.server.KafkaServer)[2020-08-05 09:07:57,658] INFO 
Connecting to zookeeper on 
node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
(kafka.server.KafkaServer)[2020-08-05 09:07:57,685] INFO [ZooKeeperClient Kafka 
server] Initializing a new session to node_2:2181. 
(kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,690] INFO Client 
environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, 
built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,693] INFO Client environment:host.name=localhost 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client 
environment:java.version=11.0.8 (org.apache.zookeeper.ZooKeeper)[2020-08-05 
09:07:57,696] INFO Client environment:java.vendor=Ubuntu 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 
(org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
environment:java.class.path=/opt/kafka/current/bin/../libs/activation-1.1.1.jar:/opt/kafka/current/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/current/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/current/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/current/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/current/bin/../libs/connect-api-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-basic-auth-extension-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-file-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-json-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-runtime-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-transforms-2.3.1.jar:/opt/kafka/current/bin/../libs/guava-20.0.jar:/opt/kafka/current/bin/../libs/hk2-api-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-locator-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-utils-2.5.0.jar:/opt/kafka/current/bin/../libs/jackson-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-core-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-databind-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-scala_2.12-2.10.0.jar:/opt/kafka/current/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/kafka/current/bin/../libs/jakarta.annotation-api-1.3.4.jar:/opt/kafka/current/bin/../libs/jakarta.inject-2.5.0.jar:/opt/kafka/current/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/opt/kafka/current/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/kafka/current/bin/../libs/javassist-3.22.0-CR2.jar:/opt/kafka/current/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/current/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/current/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/current/bin/../libs/jersey-client-2.28.jar:/opt/kafka/current/bin/../libs/jersey-common-2.28.jar:/opt/kafka/current/bin/../libs/jersey-container-servlet-2.28.jar:/opt/kafka/current/bin/../libs/jersey-container-servlet-core-2.28.jar:/opt/kafka/current/bin/../libs/jersey-hk2-2.28.jar:/opt/kafka/current/bin/../libs/jersey-media-jaxb-2.28.jar:/opt/kafka/current/bin/../libs/jersey

[GitHub] [kafka] cmccabe commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


cmccabe commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465708837



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1212,17 +1221,32 @@ class Partition(val topicPartition: TopicPartition,
 
   private def expandIsr(newIsr: Set[Int]): Unit = {
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
-maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+//val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
+//maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr, handleIsrUpdate))
+alterIsrInFlight = true
+
+info("ISR updated to [%s]".format(newIsr.mkString(",")))
+inSyncReplicaIds = newIsr
   }
 
   private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = {
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+//val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
+//maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr, handleIsrUpdate))
+alterIsrInFlight = true
   }
 
-  private[cluster] def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
+  private def handleIsrUpdate(error: Errors): Unit = {
+error match {
+  case NONE => println(s"Controller accepted ISR for $topicPartition")
+  case _ => println(s"Had an error sending ISR for $topicPartition : 
$error")
+}
+alterIsrInFlight = false
+  }

Review comment:
   It seems like we need to set the `inSyncReplicaIds` here, since we don't 
do it in `shrinkIsr`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


cmccabe commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465711962



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/*if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+  // TODO is INVALID_REQUEST a reasonable error here?
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code))
+  return
+}
+
+if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) {
+  info(s"Ignoring AlterIsr due to stale broker epoch 
${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}")
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+  return
+}*/
+
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+val resp = new AlterIsrResponseData()
+resp.setTopics(new util.ArrayList())
+
+alterIsrRequest.topics().forEach(topicReq => {
+  val topicResp = new AlterIsrResponseTopics()
+.setName(topicReq.name())
+.setPartitions(new util.ArrayList())
+  resp.topics().add(topicResp)
+
+  topicReq.partitions().forEach(partitionReq => {
+val partitionResp = new AlterIsrResponsePartitions()
+  .setPartitionIndex(partitionReq.partitionIndex())
+topicResp.partitions().add(partitionResp)
+
+// For each partition who's ISR we are altering, let's do some upfront 
validation for the broker response

Review comment:
   This is also a concurrency bug since you can't access stuff like the 
controllerContext except from the controller thread itself (it would be 
multi-threaded access without a lock)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #8117: KAFKA-8403: Suppress needs a Materialized variant

2020-08-05 Thread GitBox


dongjinleekr commented on pull request #8117:
URL: https://github.com/apache/kafka/pull/8117#issuecomment-669182284


   Hello. I updated the 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable)
 and rebased the PR onto the latest trunk. Please have a look when you are free.
   
   cc/ @vvcephei @mjsax



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


cmccabe commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465712943



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -113,8 +113,8 @@ class BrokerToControllerChannelManager(metadataCache: 
kafka.server.MetadataCache
   brokerToControllerListenerName, time, threadName)
   }
 
-  private[server] def sendRequest(request: AbstractRequest.Builder[_ <: 
AbstractRequest],
-  callback: RequestCompletionHandler): Unit = {
+  def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
+  callback: RequestCompletionHandler): Unit = {
 requestQueue.put(BrokerToControllerQueueItem(request, callback))

Review comment:
   This also needs to call `NetworkClient#wake` in case we are blocking 
inside `NetworkClient#poll`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


cmccabe commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465715770



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,121 @@
+package kafka.server

Review comment:
   It would be good to find a better name for this.  When I read 
"AlterIsrChannelManager" I assumed it had its own separate channel, rather than 
using the ControllerChannelManager.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2020-08-05 Thread GitBox


dongjinleekr commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-669229785


   Here is the KIP - [KIP-653: Upgrade log4j to 
log4j2](https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+Upgrade+log4j+to+log4j2)
   
   @omkreddy Could you have a look? I have thought you must be the perfect 
reviewer [for this feature](https://github.com/omkreddy/log4j2-kafka-appender). 
:smile:



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-05 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465756356



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (Timesta

[GitHub] [kafka] ijuma commented on pull request #9129: MINOR: Update jmh to 1.24 for async profiler support

2020-08-05 Thread GitBox


ijuma commented on pull request #9129:
URL: https://github.com/apache/kafka/pull/9129#issuecomment-669221375


   Actually, it looks like I need to improve the jmh script to make this easier 
to use. Will also update the README. So, please don't review it yet.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465743922



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/*if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+  // TODO is INVALID_REQUEST a reasonable error here?
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code))
+  return
+}
+
+if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) {
+  info(s"Ignoring AlterIsr due to stale broker epoch 
${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}")
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+  return
+}*/
+
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+val resp = new AlterIsrResponseData()
+resp.setTopics(new util.ArrayList())
+
+alterIsrRequest.topics().forEach(topicReq => {
+  val topicResp = new AlterIsrResponseTopics()
+.setName(topicReq.name())
+.setPartitions(new util.ArrayList())
+  resp.topics().add(topicResp)
+
+  topicReq.partitions().forEach(partitionReq => {
+val partitionResp = new AlterIsrResponsePartitions()
+  .setPartitionIndex(partitionReq.partitionIndex())
+topicResp.partitions().add(partitionResp)
+
+// For each partition who's ISR we are altering, let's do some upfront 
validation for the broker response

Review comment:
   @cmccabe good to know about `controllerContext` 👍 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-05 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r465744271



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {
+@Override
+public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+return new DescribeUserScramCredentialsRequest.Builder(
+new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
+}
+
+@Override
+public void handleResponse(AbstractResponse abstractResponse) {
+DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+Errors error = Errors.forCode(response.data().error());
+switch (error) {
+case NONE:
+DescribeUserScramCredentialsResponseData data = 
response.data();
+
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+userScramCredential -> {
+List scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+.collect(Collectors.toList());
+return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+})));
+break;
+case NOT_CONTROLLER:
+handleNotControllerError(error);
+break;
+default:
+future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+break;
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+};
+runnable.call(call, now);
+return new DescribeUserScramCredentialsResult(future);
+}
+
+@Override
+public AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
+ 
AlterUserScramCredentialsOptions options) {
+final long now = time.milliseconds();
+final Map> futures = new HashMap<>();
+for (UserScramCredentialAlteration alteration: alterations) {
+futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+}
+final Map userIllegalAlterationExceptions = new 
HashMap<>();
+// We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+ScramMechanism mechanism = deletion.getMechanism();
+if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+userIllegalAlterationExceptions.put(deletion.getUser(), new 
IllegalArgumentException("Unknown SCRAM mechanism"));

Review comment:
   How about `InvalidRequestException`?  It's already used in this class, 
and it might be more appropriate than `InvalidConfigurationException`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] johnthotekat commented on pull request #9120: KAFKA-10316: Consider renaming getter method for Interactive Queries

2020-08-05 Thread GitBox


johnthotekat commented on pull request #9120:
URL: https://github.com/apache/kafka/pull/9120#issuecomment-669241684


   @mjsax  One of the test failed for me in local, This one - 
KafkaAdminClientTest#testMetadataRetries. 
   It's unrelated to the changes in this PR. 
   Rest of the tests PASSED without any issues. 
   
   I guess its the same issue we are addressing with 
https://issues.apache.org/jira/browse/KAFKA-10311 ?
   ```
   org.apache.kafka.clients.admin.KafkaAdminClientTest > testMetadataRetries 
FAILED
   java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, 
deadlineMs=1596638067201, tries=1, nextAllowedTryMs=1596638067302) timed out at 
1596638067202 after 1 attempt(s)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
   at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries(KafkaAdminClientTest.java:997)
   
   Caused by:
   org.apache.kafka.common.errors.TimeoutException: 
Call(callName=describeTopics, deadlineMs=1596638067201, tries=1, 
nextAllowedTryMs=1596638067302) timed out at 1596638067202 after 1 attempt(s)
   
   Caused by:
   org.apache.kafka.common.errors.TimeoutException: Timed out 
waiting for a node assignment.
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] johnthotekat edited a comment on pull request #9120: KAFKA-10316: Consider renaming getter method for Interactive Queries

2020-08-05 Thread GitBox


johnthotekat edited a comment on pull request #9120:
URL: https://github.com/apache/kafka/pull/9120#issuecomment-669241684


   @mjsax  One of the test failed for me in local, This one - 
KafkaAdminClientTest#testMetadataRetries. 
   It's unrelated to the changes in this PR. 
   Rest of the tests PASSED without any issues. 
   
   I guess its the same issue we are addressing with 
https://issues.apache.org/jira/browse/KAFKA-10311 ? There is also an open PR 
under this. 
   ```
   org.apache.kafka.clients.admin.KafkaAdminClientTest > testMetadataRetries 
FAILED
   java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, 
deadlineMs=1596638067201, tries=1, nextAllowedTryMs=1596638067302) timed out at 
1596638067202 after 1 attempt(s)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
   at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries(KafkaAdminClientTest.java:997)
   
   Caused by:
   org.apache.kafka.common.errors.TimeoutException: 
Call(callName=describeTopics, deadlineMs=1596638067201, tries=1, 
nextAllowedTryMs=1596638067302) timed out at 1596638067202 after 1 attempt(s)
   
   Caused by:
   org.apache.kafka.common.errors.TimeoutException: Timed out 
waiting for a node assignment.
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465746162



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1776,141 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {

Review comment:
   It might be simpler just to use AlterIsrRequestData and 
AlterIsrResponseData throughout this code (rather than converting to 
`Map[TopicPartition, LeaderAndIsr]` and `Map[TopicPartition, Errors]`)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #9129: MINOR: Update jmh to 1.24 for async profiler support

2020-08-05 Thread GitBox


ijuma opened a new pull request #9129:
URL: https://github.com/apache/kafka/pull/9129


   Highlights:
   
   * async-profiler integration. Can be used with -prof async,
   pass -prof async:help to look for the accepted options.
   * perf c2c [2] integration. Can be used with -prof perfc2c,
   if available.
   
   Full details: 
https://mail.openjdk.java.net/pipermail/jmh-dev/2020-August/002982.html
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] johnthotekat edited a comment on pull request #9120: KAFKA-10316: Consider renaming getter method for Interactive Queries

2020-08-05 Thread GitBox


johnthotekat edited a comment on pull request #9120:
URL: https://github.com/apache/kafka/pull/9120#issuecomment-669241684


   @mjsax  One of the test failed for me in local, This one - 
KafkaAdminClientTest#testMetadataRetries. 
   It's unrelated to the changes in this PR. 
   Rest of the tests PASSED without any issues. 
   
   I guess its the same issue we are addressing with 
https://issues.apache.org/jira/browse/KAFKA-10311 ? There is also a PR open 
under this. 
   ```
   org.apache.kafka.clients.admin.KafkaAdminClientTest > testMetadataRetries 
FAILED
   java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, 
deadlineMs=1596638067201, tries=1, nextAllowedTryMs=1596638067302) timed out at 
1596638067202 after 1 attempt(s)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
   at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries(KafkaAdminClientTest.java:997)
   
   Caused by:
   org.apache.kafka.common.errors.TimeoutException: 
Call(callName=describeTopics, deadlineMs=1596638067201, tries=1, 
nextAllowedTryMs=1596638067302) timed out at 1596638067202 after 1 attempt(s)
   
   Caused by:
   org.apache.kafka.common.errors.TimeoutException: Timed out 
waiting for a node assignment.
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465748504



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,121 @@
+package kafka.server

Review comment:
   Yea, maybe just "AlterIsrManager"? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465748120



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1776,141 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {
+  val tp = new TopicPartition(topicReq.name(), 
partitionReq.partitionIndex())
+  val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+  isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId(), 
partitionReq.leaderEpoch(), newIsr, partitionReq.currentIsrVersion()))
+}))
+
+def responseCallback(results: Either[Map[TopicPartition, Errors], 
Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code())
+case Left(partitions: Map[TopicPartition, Errors]) =>
+  resp.setTopics(new util.ArrayList())
+  partitions.groupBy(_._1.topic()).foreachEntry((topic, partitionMap) 
=> {
+val topicResp = new AlterIsrResponseTopics()
+  .setName(topic)
+  .setPartitions(new util.ArrayList())
+resp.topics().add(topicResp)
+partitionMap.foreachEntry((partition, error) => {
+  topicResp.partitions().add(
+new AlterIsrResponsePartitions()
+  .setPartitionIndex(partition.partition())
+  .setErrorCode(error.code()))
+})
+  })
+  }
+  callback.apply(resp)
+}
+
+eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId(), 
alterIsrRequest.brokerEpoch(), isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+  callback: AlterIsrCallback): Unit = {
+if (!isActive) {
+  callback.apply(Right(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+if (!brokerEpochOpt.contains(brokerEpoch)) {
+  info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+val partitionErrors: mutable.Map[TopicPartition, Errors] = 
mutable.HashMap[TopicPartition, Errors]()
+
+val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap {
+  case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+  case Some(leaderIsrAndControllerEpoch) =>
+val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+if (newLeaderAndIsr.leader != currentLeaderAndIsr.leader) {
+  Errors.NOT_LEADER_OR_FOLLOWER
+} else if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+  Errors.FENCED_LEADER_EPOCH
+} else {
+  val currentAssignment = 
controllerContext.partitionReplicaAssignment(tp)
+  if (!newLeaderAndIsr.isr.forall(replicaId => 
currentAssignment.contains(replicaId))) {
+warn(s"Some of the proposed ISR are not in the assignment for 
partition $tp. Proposed ISR=$newLeaderAndIsr.isr assignment=$currentAssignment")
+Errors.INVALID_REQUEST
+  } else if (!newLeaderAndIsr.isr.forall(replicaId => 
controllerContext.isReplicaOnline(replicaId, tp))) {
+warn(s"Some of the proposed ISR are offline for partition $tp. 
Proposed ISR=$newLeaderAndIsr.isr")
+Errors.INVALID_REQUEST
+  } else {
+Errors.NONE
+  }
+}
+  case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+}
+if (partitionError == Errors.NONE) {
+  // Bump the leaderEpoch for partitions that we're going to write
+  Some(tp -> newLeaderAndIsr.newEpochAndZkVersion)
+} else {
+  partitionErrors.put(tp, partitionError)
+  None
+}
+}
+
+// Do the updates in ZK
+info(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =  
zkClient.updateLeaderAndIsr(
+  adjustedIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
+
+val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
+  case (partition

[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-05 Thread Ryanne Dolan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171519#comment-17171519
 ] 

Ryanne Dolan commented on KAFKA-10339:
--

I think what you mean is that a read-process-write loop cannot span clusters, 
since a transaction coordinator on one cluster cannot commit offsets on another 
cluster. But I don't think we actually need that -- we can just store offsets 
on the target cluster instead.

I think what we need is something along these lines:

- we manage offsets ourselves -- we don't rely on Connect's internal offsets 
tracking or __consumer_offsets on the source cluster.
- we only write to the target cluster.
- offsets are stored on the target cluster using a "fake" consumer group. I say 
"fake" because there would be no actual records being consumed by the group, 
just offsets being stored in __consumer_offsets topic.
- we write all records in a transaction, just as the KIP currently describes.
- in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster.
- when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster.

Result:
- if the transaction succeeds, the __consumer_offsets topic on the target 
cluster is updated.
- if the transaction aborts, all data records are dropped, and the 
__consumer_offsets topic is not updated.
- when MirrorSourceTask starts/restarts, it resumes at the last committed 
offsets, as recorded in the target cluster.

Thoughts?

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465749546



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)

Review comment:
   Should probably get rid of this and change the method to 
`enqueueIsrUpdate(TopicPartition, LeaderAndIsr)`

##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)

Review comment:
   Should probably get rid of this and change the method to 
```enqueueIsrUpdate(TopicPartition, LeaderAndIsr)```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465749546



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)

Review comment:
   Should probably get rid of this and change the method to 
   
   ```enqueueIsrUpdate(TopicPartition, LeaderAndIsr)```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465750445



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  items.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestPartitions()
+  .setPartitionIndex(item.topicPartition.partition())
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)
+  })
+})
+
+def responseHandler(response: ClientResponse): Unit = {
+  
println(response.responseBody().toString(response.requestHeader().apiVersion()))

Review comment:
   Remove this





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465750632



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  items.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestPartitions()
+  .setPartitionIndex(item.topicPartition.partition())
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)
+  })
+})
+
+def responseHandler(response: ClientResponse): Unit = {
+  
println(response.responseBody().toString(response.requestHeader().apiVersion()))
+  val body: AlterIsrResponse = 
response.responseBody().asInstanceOf[AlterIsrResponse]
+  val data: AlterIsrResponseData = body.data()
+  Errors.forCode(data.errorCode()) match {
+case Errors.NONE => info(s"Controller handled AlterIsr request")
+case e: Errors => warn(s"Controller returned an error when 
handling AlterIsr request: 

[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-05 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r465751005



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {
+@Override
+public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+return new DescribeUserScramCredentialsRequest.Builder(
+new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
+}
+
+@Override
+public void handleResponse(AbstractResponse abstractResponse) {
+DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+Errors error = Errors.forCode(response.data().error());
+switch (error) {
+case NONE:
+DescribeUserScramCredentialsResponseData data = 
response.data();
+
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+userScramCredential -> {
+List scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+.collect(Collectors.toList());
+return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+})));
+break;
+case NOT_CONTROLLER:
+handleNotControllerError(error);
+break;
+default:
+future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+break;
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+};
+runnable.call(call, now);
+return new DescribeUserScramCredentialsResult(future);
+}
+
+@Override
+public AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
+ 
AlterUserScramCredentialsOptions options) {
+final long now = time.milliseconds();
+final Map> futures = new HashMap<>();
+for (UserScramCredentialAlteration alteration: alterations) {
+futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+}
+final Map userIllegalAlterationExceptions = new 
HashMap<>();
+// We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+ScramMechanism mechanism = deletion.getMechanism();
+if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+userIllegalAlterationExceptions.put(deletion.getUser(), new 
IllegalArgumentException("Unknown SCRAM mechanism"));
+}
+});
+// Creating an upsertion may throw InvalidKeyException or 
NoSuchAlgorithmException,
+// so keep track of which users are affected by such a failure and 
immediately fail all their alterations
+final Map> userInsertions 
= new HashMap<>();
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialUpsertion)
+.filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.getUser()))
+.forEach(alteration -> {
+UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
+String user = upsertion.getUser();
+try {
+Scr

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-05 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465751597



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (Timesta

[GitHub] [kafka] hachikuji opened a new pull request #9130: [DRAFT] Kafka Raft Implementation (KIP-595)

2020-08-05 Thread GitBox


hachikuji opened a new pull request #9130:
URL: https://github.com/apache/kafka/pull/9130


   This is a draft of the changes needed for KIP-595. The draft tag will be 
removed in the upcoming weeks if/when the KIP is adopted. Note that there are 
still a few significant protocol differences from the documented proposal that 
we are still in the process of reconciling.
   
   Co-authored-by: Boyang Chen 
   Co-authored-by: Guozhang Wang  
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded

2020-08-05 Thread GitBox


mumrah commented on pull request #7222:
URL: https://github.com/apache/kafka/pull/7222#issuecomment-669259920


   Reviving this PR cc @hachikuji @ijuma @andrewchoi5 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-05 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Description: 
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the documentation: 
[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
 ]

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.

  was:
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 

"org.apache.kafka" % "kafka-clients" % "2.1.0"

I followed the documentation and I was expecting that transactions fail when I 
call .commitTransaction if some problem is raised when sending a message like 
it's described in the documentation: 
[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
]

Unfortunatelly, when testing this behaviour using a message larger than the 
size accepted by the Kafka broker/cluster, the transactions are not working 
properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
- when the message has 1MB, the transaction is aborted and an exception is 
raised when calling commitTransaction()
- when the message is bigger than 1MB, the transaction is completed 
successfully without the message being written. no exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.


> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the documentation: 
> [https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
>  ]
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-05 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Description: 
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|[http://example.com|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]].

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.

  was:
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the documentation: 
[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
 ]

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.


> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|[http://example.com|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]].
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-05 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Description: 
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]

[link title|http://example.com]

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.

  was:
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.


> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]
> [link title|http://example.com]
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-05 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Description: 
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.

  was:
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|[http://example.com|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]].

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.


> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-05 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Description: 
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]

 

[documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.

  was:
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]

[link title|http://example.com]

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.


> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]
>  
> [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-05 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Description: 
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation.|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.

  was:
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]]

 

[documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.


> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation.|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-05 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Description: 
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.

  was:
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation.|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.


> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10334) Transactions not working properly

2020-08-05 Thread Luis Araujo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171586#comment-17171586
 ] 

Luis Araujo commented on KAFKA-10334:
-

After creating this issue, I tested the await for the future to be completed 
and just then calling *commitTransaction* function. I will leave a similar 
example:
{code:java}
Producer producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord("my-topic", 
Integer.toString(i), Integer.toString(i))).get();

producer.commitTransaction();{code}

With this code, I'm getting an exception when the message is above (and not 
equal to) 1MB when I call *.get* in the Future.

> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-05 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Description: 
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.

The configs that I'm using to create the KafkaProducer in order to use 
transactions:
{code:java}
new Properties() {
  {
put(BOOTSTRAP_SERVERS_CONFIG, 
"localhost:29092,localhost:29093,localhost:29094")
put(ACKS_CONFIG, "-1")
put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
put(KEY_SERIALIZER_CLASS_CONFIG, 
Class.forName(classOf[StringSerializer].getName))
put(VALUE_SERIALIZER_CLASS_CONFIG, 
Class.forName(classOf[ByteArraySerializer].getName))
put(CLIENT_ID_CONFIG, "app")
put(TRANSACTIONAL_ID_CONFIG, "app")
put(ENABLE_IDEMPOTENCE_CONFIG, "true")
  }
}
{code}

  was:
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.


> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when

[jira] [Comment Edited] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-05 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171612#comment-17171612
 ] 

Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:44 PM:
-

thanks for the input. I think that sounds a working plan. Here is my follow-up 
thoughts

_"when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking 
we probably need to:
(1) add a new API (called "Map loadOffsets()") to 
[SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the 
consumer offsets loaded from target cluster.
(3) in 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295],
 when initialize the consumer, if `task.loadOffsets()` returns non empty, use 
the returned offsets as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster."_

I fully agree with the "fake" consumer group on the target cluster. I am 
thinking if "addOffsetsToTransaction" has been taken care by 
*producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?


was (Author: yangguo1220):
thanks for the input. I think that sounds a working plan. Here is my follow-up 
thoughts

_"when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking 
we probably need to:
(1) add a new API (called "Map loadOffsets()") to 
[SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the 
consumer offsets loaded from target cluster.
(3) in 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295],
 when initialize the consumer, if `task.loadOffsets()` returns non empty, use 
the returned offsets as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster."_
I fully agree with the "fake" consumer group on the target cluster. I am 
thinking if "addOffsetsToTransaction" has been taken care by 
*producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-05 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171612#comment-17171612
 ] 

Ning Zhang commented on KAFKA-10339:


thanks for the input. I think that sounds a working plan. Here is my follow-up 
thoughts

_"when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking 
we probably need to:
(1) add a new API (called "Map loadOffsets()") to 
[SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the 
consumer offsets loaded from target cluster.
(3) in 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295],
 when initialize the consumer, if `task.loadOffsets()` returns non empty, use 
the returned offsets as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster."_
I fully agree with the "fake" consumer group on the target cluster. I am 
thinking if "addOffsetsToTransaction" has been taken care by 
*producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-05 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171612#comment-17171612
 ] 

Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:47 PM:
-

thanks for the input. I think that sounds a working plan. Here is my follow-up 
thoughts

_"when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking 
we probably need to:
(1) add a new API (called "Map loadOffsets()") to 
[SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the 
consumer offsets loaded from target cluster.
(3) in 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295],
 when initialize the consumer, if `task.loadOffsets()` returns non empty, use 
the returned offsets for the consumer in WorkerSinkTask as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster."_

I fully agree with the "fake" consumer group on the target cluster. I am 
thinking if "addOffsetsToTransaction" has been taken care by 
*producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?


was (Author: yangguo1220):
thanks for the input. I think that sounds a working plan. Here is my follow-up 
thoughts

_"when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking 
we probably need to:
(1) add a new API (called "Map loadOffsets()") to 
[SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the 
consumer offsets loaded from target cluster.
(3) in 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295],
 when initialize the consumer, if `task.loadOffsets()` returns non empty, use 
the returned offsets as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster."_

I fully agree with the "fake" consumer group on the target cluster. I am 
thinking if "addOffsetsToTransaction" has been taken care by 
*producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-05 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r465867406



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -369,6 +370,32 @@ class GroupCoordinator(val brokerId: Int,
 }
   }
 
+  /**
+   * try to complete produce, fetch and delete requests if the HW of partition 
is incremented. Otherwise, we try to complete
+   * only delayed fetch requests.
+   *
+   * Noted that this method may hold multiple group locks so the caller should 
NOT hold any group lock
+   * in order to avoid deadlock
+   * @param topicPartitions a map contains the partition and a flag indicting 
whether the HWM has been changed
+   */
+  private[group] def completeDelayedRequests(topicPartitions: 
Map[TopicPartition, LeaderHwChange]): Unit =
+topicPartitions.foreach {
+  case (tp, leaderHWIncremented) => leaderHWIncremented match {
+case LeaderHwIncremented => 
groupManager.replicaManager.completeDelayedRequests(tp)
+case _ => groupManager.replicaManager.completeDelayedFetchRequests(tp)
+  }
+}
+
+  /**
+   * complete the delayed join requests associated to input group keys.
+   *
+   * Noted that delayedJoin itself uses a lock other than the group lock for 
DelayedOperation.maybeTryComplete() and

Review comment:
   @chia7712 : Yes, that's a possibility. It adds some complexity to 
DelayedOperation. Another possibility is to have a special case to complete the 
delayed requests from groupManager.storeGroup() in 
GroupCoordinator.onCompleteJoin() in a separate thread.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-08-05 Thread GitBox


guozhangwang commented on pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#issuecomment-669308138


   System test green: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4093/
   
   Ready for a final review and merge cc @mjsax 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-05 Thread GitBox


dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465867372



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update 
for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+/* Client to use */
+private ConsumerNetworkClient client;
+
+/* Configs to update */
+private GroupRebalanceConfig rebalanceConfig;
+
+/* Object to synchronize on when response is recieved */
+Object lock;

Review comment:
   I think I forgot to remove this after starting to use atomics to track 
the interval and in-progress updates.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9127: MINOR: fix HTML

2020-08-05 Thread GitBox


guozhangwang commented on pull request #9127:
URL: https://github.com/apache/kafka/pull/9127#issuecomment-669320718


   LGTM!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9127: MINOR: fix HTML

2020-08-05 Thread GitBox


guozhangwang commented on pull request #9127:
URL: https://github.com/apache/kafka/pull/9127#issuecomment-669320891


   The kafka-site has already been updated, so yes you do need to have a 
separate PR for it :P



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465886919



##
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
 assertThat(sensor.hasMetrics(), is(true));
 }
+
+@Test
+public void testStrictQuotaEnforcementWithRate() {
+final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+.quota(Quota.upperBound(2))
+.timeWindow(1, TimeUnit.SECONDS)
+.samples(11));
+final MetricName metricName = metrics.metricName("rate", "test-group");
+assertTrue(sensor.add(metricName, new Rate()));
+final KafkaMetric rateMetric = metrics.metric(metricName);
+
+// Recording a first value at T+0 to bring the avg rate to 3 which is 
already
+// above the quota.
+strictRecord(sensor, 30, time.milliseconds());
+assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+// ((30 / 10) - 2) / 2 * 10 = 5s
+time.sleep(5000);
+
+// But, recording a second value is rejected because the avg rate is 
still equal
+// to 3 after 5s.
+assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 
30, time.milliseconds()));
+
+metrics.close();
+}
+
+@Test
+public void testStrictQuotaEnforcementWithTokenBucket() {
+final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+.quota(Quota.upperBound(2))
+.timeWindow(1, TimeUnit.SECONDS)
+.samples(11));
+final MetricName metricName = metrics.metricName("credits", 
"test-group");
+assertTrue(sensor.add(metricName, new TokenBucket()));
+final KafkaMetric tkMetric = metrics.metric(metricName);
+
+// Recording a first value at T+0 to bring the remaining credits below 
zero
+strictRecord(sensor, 30, time.milliseconds());
+assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+// 10 / 2 = 5s
+time.sleep(5000);
+
+// Unlike the default rate based on a windowed sum, it works as 
expected.
+assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
+strictRecord(sensor, 30, time.milliseconds());
+assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+metrics.close();
+}
+
+private void strictRecord(Sensor sensor, double value, long timeMs) {
+synchronized (sensor) {
+sensor.checkQuotas(timeMs);

Review comment:
   In the above two tests, I simulate a "strict quotas" in the sense that 
recording is not allowed if the quota is already violated. Therefore, I check 
it before recording the value.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r46517



##
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
 assertThat(sensor.hasMetrics(), is(true));
 }
+
+@Test
+public void testStrictQuotaEnforcementWithRate() {
+final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+.quota(Quota.upperBound(2))
+.timeWindow(1, TimeUnit.SECONDS)
+.samples(11));
+final MetricName metricName = metrics.metricName("rate", "test-group");
+assertTrue(sensor.add(metricName, new Rate()));
+final KafkaMetric rateMetric = metrics.metric(metricName);
+
+// Recording a first value at T+0 to bring the avg rate to 3 which is 
already
+// above the quota.
+strictRecord(sensor, 30, time.milliseconds());
+assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+// ((30 / 10) - 2) / 2 * 10 = 5s
+time.sleep(5000);
+
+// But, recording a second value is rejected because the avg rate is 
still equal
+// to 3 after 5s.
+assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 
30, time.milliseconds()));
+
+metrics.close();
+}
+
+@Test
+public void testStrictQuotaEnforcementWithTokenBucket() {
+final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+.quota(Quota.upperBound(2))
+.timeWindow(1, TimeUnit.SECONDS)
+.samples(11));
+final MetricName metricName = metrics.metricName("credits", 
"test-group");
+assertTrue(sensor.add(metricName, new TokenBucket()));
+final KafkaMetric tkMetric = metrics.metric(metricName);
+
+// Recording a first value at T+0 to bring the remaining credits below 
zero
+strictRecord(sensor, 30, time.milliseconds());
+assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+// 10 / 2 = 5s
+time.sleep(5000);
+
+// Unlike the default rate based on a windowed sum, it works as 
expected.
+assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
+strictRecord(sensor, 30, time.milliseconds());
+assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+metrics.close();
+}
+
+private void strictRecord(Sensor sensor, double value, long timeMs) {
+synchronized (sensor) {
+sensor.checkQuotas(timeMs);
+sensor.record(value, timeMs, false);
+}
+}
+
+@Test
+public void testRecordAndCheckQuotaUseMetricConfigOfEachStat() {
+final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor");
+
+final MeasurableStat stat1 = Mockito.mock(MeasurableStat.class);
+final MetricName stat1Name = metrics.metricName("stat1", "test-group");
+final MetricConfig stat1Config = new 
MetricConfig().quota(Quota.upperBound(5));
+sensor.add(stat1Name, stat1, stat1Config);
+
+final MeasurableStat stat2 = Mockito.mock(MeasurableStat.class);
+final MetricName stat2Name = metrics.metricName("stat2", "test-group");
+final MetricConfig stat2Config = new 
MetricConfig().quota(Quota.upperBound(10));
+sensor.add(stat2Name, stat2, stat2Config);
+
+sensor.record(10, 1);
+Mockito.verify(stat1).record(stat1Config, 10, 1);
+Mockito.verify(stat2).record(stat2Config, 10, 1);
+
+Mockito.when(stat1.measure(stat1Config, 2)).thenReturn(2.0);
+Mockito.when(stat2.measure(stat2Config, 2)).thenReturn(2.0);
+sensor.checkQuotas(2);

Review comment:
   No. This test actually reproduce a bug that I have found. Basically, a 
stat can be added to the Sensor with a MetricsConfig but the Sensor was not 
using the provided one when recording a value but was using the one of the 
Sensor all the time. This test verifies that the correct config is used both 
for recording and measuring via calling checkQuota. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and us

[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465889286



##
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##
@@ -156,6 +174,24 @@ class ControllerMutationQuotaManager(private val config: 
ClientQuotaManagerConfi
   quotaMetricTags.asJava)
   }
 
+  protected def clientTokenBucketMetricName(quotaMetricTags: Map[String, 
String]): MetricName = {

Review comment:
   Yes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465889447



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+private final TimeUnit unit;
+private double tokens;
+private long lastUpdateMs;
+
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+public TokenBucket(TimeUnit unit) {
+this.unit = unit;
+this.tokens = 0;
+this.lastUpdateMs = 0;
+}
+
+@Override
+public double measure(final MetricConfig config, final long timeMs) {
+if (config.quota() == null)
+return Long.MAX_VALUE;
+final double quota = config.quota().bound();
+final double burst = burst(config);
+refill(quota, burst, timeMs);
+return this.tokens;
+}
+
+@Override
+public void record(final MetricConfig config, final double value, final 
long timeMs) {
+if (config.quota() == null)
+return;
+final double quota = config.quota().bound();
+final double burst = burst(config);
+refill(quota, burst, timeMs);
+this.tokens = Math.min(burst, this.tokens - value);
+}
+
+private void refill(final double quota, final double burst, final long 
timeMs) {
+this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - 
lastUpdateMs));
+this.lastUpdateMs = timeMs;
+}
+
+private double burst(final MetricConfig config) {
+return (config.samples() - 1) * convert(config.timeWindowMs()) * 
config.quota().bound();
+}
+
+private double convert(final long timeMs) {
+switch (unit) {

Review comment:
   Sure, makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465889824



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+private final TimeUnit unit;
+private double tokens;
+private long lastUpdateMs;
+
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+public TokenBucket(TimeUnit unit) {
+this.unit = unit;
+this.tokens = 0;
+this.lastUpdateMs = 0;
+}
+
+@Override
+public double measure(final MetricConfig config, final long timeMs) {
+if (config.quota() == null)
+return Long.MAX_VALUE;
+final double quota = config.quota().bound();
+final double burst = burst(config);
+refill(quota, burst, timeMs);
+return this.tokens;
+}
+
+@Override
+public void record(final MetricConfig config, final double value, final 
long timeMs) {
+if (config.quota() == null)
+return;
+final double quota = config.quota().bound();
+final double burst = burst(config);
+refill(quota, burst, timeMs);
+this.tokens = Math.min(burst, this.tokens - value);
+}
+
+private void refill(final double quota, final double burst, final long 
timeMs) {
+this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - 
lastUpdateMs));
+this.lastUpdateMs = timeMs;
+}
+
+private double burst(final MetricConfig config) {
+return (config.samples() - 1) * convert(config.timeWindowMs()) * 
config.quota().bound();

Review comment:
   Ack. I misunderstood it. `config.samples()` sounds good to me.

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {

Review comment:
   Sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine closed pull request #8513: MINOR: jackson 2.10.3

2020-08-05 Thread GitBox


kkonstantine closed pull request #8513:
URL: https://github.com/apache/kafka/pull/8513


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-05 Thread Ryanne Dolan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171642#comment-17171642
 ] 

Ryanne Dolan commented on KAFKA-10339:
--

I believe your proposed task.loadOffsets() is taken care of by the existing 
SinkTaskContext.offsets() method actually. This mechanism is used in other 
SinkTasks where the offsets are stored in the downstream system. I think we may 
already have all the interfaces required to make this work.

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #8513: MINOR: jackson 2.10.3

2020-08-05 Thread GitBox


kkonstantine commented on pull request #8513:
URL: https://github.com/apache/kafka/pull/8513#issuecomment-669328325


   Closing here in favor of https://github.com/apache/kafka/pull/9058
   Thanks for suggesting the version upgrade @sullis 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-05 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-669330984


   @chia7712 : The following is my thought after thinking about this a bit 
more. The changes that we made in DelayedJoin is complicated and it still 
doesn't completely solve the deadlock issue. Adding more complexity there to 
solve the issue is probably not ideal. As a compromise, I was thinking another 
approach. We could pass down a flag to ReplicaManager.appendRecords() to 
complete the delayed requests in a separate thread there. Only GroupCoordinator 
will set this flag. So, the background completeness check is limited to the 
offset_commit topic. Since this topic is low volume, a single thread is likely 
enough to handle the load. So, we don't have to make the number of thread 
configurable and don't have to worry about this thread being overwhelmed. The 
benefit of this approach is that the code is probably easier to understand 
since we could keep most existing logic in GroupCoordinator unchanged. What do 
you think?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10162) Use Token Bucket algorithm for controller mutation quota

2020-08-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-10162:

Summary: Use Token Bucket algorithm for controller mutation quota  (was: 
Update Rate implementation to cope with spiky workload)

> Use Token Bucket algorithm for controller mutation quota
> 
>
> Key: KAFKA-10162
> URL: https://issues.apache.org/jira/browse/KAFKA-10162
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10364) Use Token Bucket for all quotas

2020-08-05 Thread David Jacot (Jira)
David Jacot created KAFKA-10364:
---

 Summary: Use Token Bucket for all quotas
 Key: KAFKA-10364
 URL: https://issues.apache.org/jira/browse/KAFKA-10364
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669343564


   @junrao Thanks for your comments. I just pushed an update which addressed 
them.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah edited a comment on pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded

2020-08-05 Thread GitBox


mumrah edited a comment on pull request #7222:
URL: https://github.com/apache/kafka/pull/7222#issuecomment-669345762


   
![image](https://user-images.githubusercontent.com/55116/89447599-3e2aac80-d724-11ea-8d0d-a4a4857f230c.png)
   
   Failed run https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1860/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded

2020-08-05 Thread GitBox


mumrah commented on pull request #7222:
URL: https://github.com/apache/kafka/pull/7222#issuecomment-669345762


   
![image](https://user-images.githubusercontent.com/55116/89447599-3e2aac80-d724-11ea-8d0d-a4a4857f230c.png)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded

2020-08-05 Thread GitBox


mumrah commented on pull request #7222:
URL: https://github.com/apache/kafka/pull/7222#issuecomment-669345934


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-05 Thread GitBox


dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465907401



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update 
for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+/* Client to use */
+private ConsumerNetworkClient client;
+
+/* Configs to update */
+private GroupRebalanceConfig rebalanceConfig;
+
+/* Object to synchronize on when response is recieved */
+Object lock;
+
+/* Logger to use */
+private Logger log;
+
+/* The resource name to use when constructing a DescribeConfigsRequest */
+private final String clientId;
+
+/* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+private Map previousDynamicConfigs;
+
+/* Indicates if we have recieved the initial dynamic configurations */
+private boolean initialConfigsFetched;
+
+public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, 
GroupRebalanceConfig config, Time time, LogContext logContext) {
+super(time);
+this.rebalanceConfig = config;
+this.log = logContext.logger(DynamicConsumerConfig.class);
+this.client = client;
+this.lock = lock;
+this.clientId = rebalanceConfig.clientId;
+this.previousDynamicConfigs = new HashMap<>();
+this.initialConfigsFetched = false;
+}
+
+/**
+ * Send a {@link DescribeConfigsRequest} to a node specifically for 
dynamic client configurations
+ *
+ * @return {@link RequestFuture} 
+ */ 
+public RequestFuture maybeFetchInitialConfigs() {
+if (!initialConfigsFetched) {
+Node node = null;
+while (node == null) {
+node = client.leastLoadedNode();

Review comment:
   The main thread is calling the method and the main thread is polling. 
This was a hack to try to force the initial describe configs before the join 
group request. This returns non-null when there is a connected node, connecting 
node, or node with no connection. It may be best to just fetch the configs and 
poll for them if the node is non-null.  If it is null then maybe skip the first 
synchronous DescribeConfigs RPC and fetch the configs asynchronously on the 
next try.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

2020-08-05 Thread GitBox


dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r465908873



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2982,12 +3089,33 @@ class KafkaApis(val requestChannel: RequestChannel,
 logIfDenied: Boolean = true,
 refCount: Int = 1): Boolean = {
 authorizer.forall { authZ =>
-  val resource = new ResourcePattern(resourceType, resourceName, 
PatternType.LITERAL)
-  val actions = Collections.singletonList(new Action(operation, resource, 
refCount, logIfAllowed, logIfDenied))
-  authZ.authorize(requestContext, actions).get(0) == 
AuthorizationResult.ALLOWED
+  if (authorizeAction(requestContext, operation,
+resourceType, resourceName, logIfAllowed, logIfDenied, refCount, 
authZ)) {
+true
+  } else {
+operation match {
+  case ALTER | ALTER_CONFIGS | CREATE | DELETE =>
+requestContext.maybeFromControlPlane &&
+  authorizeAction(requestContext, CLUSTER_ACTION,
+resourceType, resourceName, logIfAllowed, logIfDenied, 
refCount, authZ)

Review comment:
   I wonder if this is correct. Usually, we use `CLUSTER_ACTION` action 
with the `CLUSTER` resource. For instance, this is how we authorize control 
requests:
   ```
   authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)
   ```
   I thought that we would do the same in this case. Don't we?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-05 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171670#comment-17171670
 ] 

Ning Zhang commented on KAFKA-10339:


it is true that SinkTaskContext.offsets() has been taken covered. I will test 
this updated idea out in my local and the extra step is to create a "fake" 
consumer group on target cluster.

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


junrao commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465921001



##
File path: 
clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TokenBucketTest {
+Time time;
+
+@Before
+public void setup() {
+time = new MockTime(0, System.currentTimeMillis(), System.nanoTime());
+}
+
+@Test
+public void testRecord() {
+// Rate  = 5 unit / sec
+// Burst = 2 * (11 - 1) = 20 units
+MetricConfig config = new MetricConfig()
+.quota(Quota.upperBound(5))
+.timeWindow(2, TimeUnit.SECONDS)
+.samples(10);
+
+TokenBucket tk = new TokenBucket();
+
+// Expect 100 credits at T
+assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+// Record 60 at T, expect 13 credits
+tk.record(config, 60, time.milliseconds());
+assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
+
+// Advance by 2s, record 5, expect 45 credits
+time.sleep(2000);
+tk.record(config, 5, time.milliseconds());
+assertEquals(45, tk.measure(config, time.milliseconds()), 0.1);
+
+// Advance by 2s, record 60, expect -5 credits
+time.sleep(2000);
+tk.record(config, 60, time.milliseconds());
+assertEquals(-5, tk.measure(config, time.milliseconds()), 0.1);
+}
+
+@Test
+public void testUnrecord() {
+// Rate  = 5 unit / sec
+// Burst = 2 * (11 - 1) = 20 units

Review comment:
   we have 10 samples now.

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token 
bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the 
maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * 
Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits 
in the bucket

Review comment:
   Could we document how this quota behaves differently from existing quota?

##
File path: 
clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional informatio

[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669378432


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #9110: MINOR: Ensure a reason is logged for all segment deletion operations

2020-08-05 Thread GitBox


dhruvilshah3 commented on a change in pull request #9110:
URL: https://github.com/apache/kafka/pull/9110#discussion_r465939741



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2227,14 +2210,17 @@ class Log(@volatile private var _dir: File,
* @param segments The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted 
asynchronously
*/
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment], 
asyncDelete: Boolean): Unit = {
+  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
+  asyncDelete: Boolean,
+  reason: SegmentDeletionReason): Unit = {
 if (segments.nonEmpty) {
   lock synchronized {
 // As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
 // removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
 // iteration remain valid and deterministic.
 val toDelete = segments.toList
 toDelete.foreach { segment =>
+  info(s"${reason.reasonString(this, segment)}")

Review comment:
   I think this is reasonable. Logging a segment per line will make it 
easier for us to diagnose issues. I made the change to log a segment per line 
for retention-related deletions. We still batch all segments in a single line 
for all other deletion events, eg. log deletion, truncation, etc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-05 Thread GitBox


dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465907401



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update 
for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+/* Client to use */
+private ConsumerNetworkClient client;
+
+/* Configs to update */
+private GroupRebalanceConfig rebalanceConfig;
+
+/* Object to synchronize on when response is recieved */
+Object lock;
+
+/* Logger to use */
+private Logger log;
+
+/* The resource name to use when constructing a DescribeConfigsRequest */
+private final String clientId;
+
+/* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+private Map previousDynamicConfigs;
+
+/* Indicates if we have recieved the initial dynamic configurations */
+private boolean initialConfigsFetched;
+
+public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, 
GroupRebalanceConfig config, Time time, LogContext logContext) {
+super(time);
+this.rebalanceConfig = config;
+this.log = logContext.logger(DynamicConsumerConfig.class);
+this.client = client;
+this.lock = lock;
+this.clientId = rebalanceConfig.clientId;
+this.previousDynamicConfigs = new HashMap<>();
+this.initialConfigsFetched = false;
+}
+
+/**
+ * Send a {@link DescribeConfigsRequest} to a node specifically for 
dynamic client configurations
+ *
+ * @return {@link RequestFuture} 
+ */ 
+public RequestFuture maybeFetchInitialConfigs() {
+if (!initialConfigsFetched) {
+Node node = null;
+while (node == null) {
+node = client.leastLoadedNode();

Review comment:
   The main thread is calling the method and the main thread is polling. 
This was a hack to try to force the initial describe configs before the join 
group request. This returns non-null when there is a connected node, connecting 
node, or node with no connection. It may be best to just fetch the configs and 
poll for them if the node is non-null and ready to send a request to.  If it is 
null or not ready then maybe skip the first synchronous DescribeConfigs RPC and 
fetch the configs asynchronously on the next try.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-05 Thread GitBox


rajinisivaram commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r465952166



##
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##
@@ -508,7 +563,15 @@ object ConfigCommand extends Config {
 
   val entityStr = (entitySubstr(ClientQuotaEntity.USER) ++ 
entitySubstr(ClientQuotaEntity.CLIENT_ID)).mkString(", ")
   val entriesStr = entries.asScala.map(e => 
s"${e._1}=${e._2}").mkString(", ")
-  println(s"Configs for ${entityStr} are ${entriesStr}")
+  println(s"Quota configs for ${entityStr} are ${entriesStr}")
+}
+// we describe user SCRAM credentials only when we are not describing 
client information
+// and we are not given either --entity-default or --user-defaults
+if (!entityTypes.contains(ConfigType.Client) && !entityNames.contains("")) 
{
+  getUserScramCredentialConfigs(adminClient, entityNames).foreach { case 
(user, description) =>

Review comment:
   What do we do if the user is not a SCRAM user? Won't this throw an 
exception? Can we make sure that user without quota or SCRAM credential doesn't 
print any errors or exceptions?

##
File path: core/src/main/scala/kafka/server/AdminManager.scala
##
@@ -980,4 +984,137 @@ class AdminManager(val config: KafkaConfig,
   entry.entity -> apiError
 }.toMap
   }
+
+  def describeUserScramCredentials(users: Option[Seq[String]]): 
DescribeUserScramCredentialsResponseData = {
+val retval = new DescribeUserScramCredentialsResponseData()
+
+def addToResults(user: String, userConfig: Properties) = {
+  val configKeys = userConfig.stringPropertyNames
+  val hasScramCredential = !ScramMechanism.values().toList.filter(key => 
key != ScramMechanism.UNKNOWN && 
configKeys.contains(key.getMechanismName)).isEmpty
+  if (hasScramCredential) {
+val userScramCredentials = new UserScramCredential().setName(user)
+ScramMechanism.values().foreach(mechanism => if (mechanism != 
ScramMechanism.UNKNOWN) {
+  val propertyValue = 
userConfig.getProperty(mechanism.getMechanismName)
+  if (propertyValue != null) {
+val iterations = 
ScramCredentialUtils.credentialFromString(propertyValue).iterations
+userScramCredentials.credentialInfos.add(new 
CredentialInfo().setMechanism(mechanism.getType).setIterations(iterations))
+  }
+})
+retval.userScramCredentials.add(userScramCredentials)
+  }
+}
+
+if (!users.isDefined || users.get.isEmpty)
+  // describe all users
+  adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach { case 
(user, properties) => addToResults(user, properties) }
+else {
+  // describe specific users
+  // 
https://stackoverflow.com/questions/24729544/how-to-find-duplicates-in-a-list
+  val duplicatedUsers = users.get.groupBy(identity).collect { case (x, 
Seq(_, _, _*)) => x }
+  if (duplicatedUsers.nonEmpty) {
+retval.setError(Errors.INVALID_REQUEST.code())
+retval.setErrorMessage(s"Cannot describe SCRAM credentials for the 
same user twice in a single request: ${duplicatedUsers.mkString("[", ", ", 
"]")}")
+  } else
+users.get.foreach { user => addToResults(user, 
adminZkClient.fetchEntityConfig(ConfigType.User, Sanitizer.sanitize(user))) }

Review comment:
   should we catch exception?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {
+@Override
+public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+return new DescribeUserScramCredentialsRequest.Builder(
+new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
+}
+
+@Override
+public void handleResponse(AbstractResponse abstractResponse) {
+DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+Errors error = Errors.forCode(response.data().error());
+switch (error) {
+case NONE:
+DescribeUserScramCredentialsResponseData data = 
respons

[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465968369



##
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
 assertThat(sensor.hasMetrics(), is(true));
 }
+
+@Test
+public void testStrictQuotaEnforcementWithRate() {
+final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+.quota(Quota.upperBound(2))
+.timeWindow(1, TimeUnit.SECONDS)
+.samples(11));

Review comment:
   The config is correct. 11 samples. With the few samples in the test, the 
total window is actually 10s. This is why I use 10 in the formulas.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669465274


   @junrao Thanks. I have updated the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-08-05 Thread GitBox


guozhangwang commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r465923769



##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -326,13 +321,18 @@ bootstrap.serversstate.cleanup.delay.ms
 Low
 The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.
-60 milliseconds
+60 milliseconds (10 minutes)
   
   state.dir
 High
 Directory location for state stores.
 /tmp/kafka-streams
   
+  task.timeout.ms
+Medium

Review comment:
   "medium" looks fine to me. But I'm not feeling strong against it either.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9126: MINOR: Code cleanup in StreamsResetter

2020-08-05 Thread GitBox


mjsax merged pull request #9126:
URL: https://github.com/apache/kafka/pull/9126


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


junrao commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465989800



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token 
bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the 
maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * 
Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits 
in the bucket
+ * is below zero. The enforcement is done by the {@link 
org.apache.kafka.common.metrics.Sensor}.
+ *
+ * Token Bucket vs Rate based Quota:
+ * The current sampled rate based quota does not cope well with bursty 
workloads. The issue is
+ * that a unique and large sample can hold the average above the quota and 
this until it is

Review comment:
   "this until it is" doesn't quite parse.

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token 
bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the 
maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * 
Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits 
in the bucket
+ * is below zero. The enforcement is done by the {@link 
org.apache.kafka.common.metrics.Sensor}.
+ *
+ * Token Bucket vs Rate based Quota:
+ * The current sampled rate based quota does not cope well with bursty 
workloads. The issue is
+ * that a unique and large sample can hold the average above the quota and 
this until it is
+ * discarded. Practically, when this happens, one must wait until the sample 
is expired to
+ * bring the rate below the quota even though less time would be theoretically 
required. As an
+ * examples, let's imagine that we have:
+ * - Quota (Q)   = 5
+ * - Samples (S) = 100
+ * - Window (W)  = 1s
+ * A burst of 560 brings the average rate (R) to 5.6 (560 / 100). The throttle 
time is computed as

Review comment:
   "The throttle time " : I guess this is the expected throttle time?






[GitHub] [kafka] mjsax commented on pull request #9127: MINOR: fix HTML

2020-08-05 Thread GitBox


mjsax commented on pull request #9127:
URL: https://github.com/apache/kafka/pull/9127#issuecomment-669498317


   Found one more tiny issue. Will merge if no objections raise.
   
   kafka-site PR: https://github.com/apache/kafka-site/pull/283



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669499409


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669499767


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669500012


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9067: MINOR: Streams integration tests should not call exit

2020-08-05 Thread GitBox


mjsax merged pull request #9067:
URL: https://github.com/apache/kafka/pull/9067


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465997810



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token 
bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the 
maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * 
Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits 
in the bucket
+ * is below zero. The enforcement is done by the {@link 
org.apache.kafka.common.metrics.Sensor}.
+ *
+ * Token Bucket vs Rate based Quota:
+ * The current sampled rate based quota does not cope well with bursty 
workloads. The issue is
+ * that a unique and large sample can hold the average above the quota and 
this until it is
+ * discarded. Practically, when this happens, one must wait until the sample 
is expired to
+ * bring the rate below the quota even though less time would be theoretically 
required. As an
+ * examples, let's imagine that we have:
+ * - Quota (Q)   = 5
+ * - Samples (S) = 100
+ * - Window (W)  = 1s
+ * A burst of 560 brings the average rate (R) to 5.6 (560 / 100). The throttle 
time is computed as

Review comment:
   Yes, that is correct. I have added "expected" to be clearer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465997624



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token 
bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the 
maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * 
Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits 
in the bucket
+ * is below zero. The enforcement is done by the {@link 
org.apache.kafka.common.metrics.Sensor}.
+ *
+ * Token Bucket vs Rate based Quota:
+ * The current sampled rate based quota does not cope well with bursty 
workloads. The issue is
+ * that a unique and large sample can hold the average above the quota and 
this until it is

Review comment:
   i have removed the "and this". Does it parse better?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-08-05 Thread GitBox


abbccdda commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r465999685



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -373,8 +373,15 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
 final Set statefulTasks = new HashSet<>();
 
-final boolean probingRebalanceNeeded =
-assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, 
clientMetadataMap, partitionsForTask, statefulTasks);
+final boolean probingRebalanceNeeded;
+try {
+probingRebalanceNeeded = assignTasksToClients(fullMetadata, 
allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, 
statefulTasks);
+} catch (final TaskAssignmentException | TimeoutException e) {
+return new GroupAssignment(

Review comment:
   Since we could throw different exceptions here, would be good to add a 
log to indicate which type of exception is thrown.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9067: MINOR: Streams integration tests should not call exit

2020-08-05 Thread GitBox


mjsax commented on pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#issuecomment-669509500


   Merged to `trunk` and cherry-picked to `2.6` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669510023


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669509903


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-05 Thread GitBox


dajac commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669512698


   Sure. I will update the KIP and the mailing list tomorrow. Thanks, Jun!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-08-05 Thread GitBox


mjsax commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r466006589



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -373,8 +373,15 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
 final Set statefulTasks = new HashSet<>();
 
-final boolean probingRebalanceNeeded =
-assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, 
clientMetadataMap, partitionsForTask, statefulTasks);
+final boolean probingRebalanceNeeded;
+try {
+probingRebalanceNeeded = assignTasksToClients(fullMetadata, 
allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, 
statefulTasks);
+} catch (final TaskAssignmentException | TimeoutException e) {
+return new GroupAssignment(

Review comment:
   There are already corresponding log.error statement before those 
exceptions are thrown. No need to double log IMHO?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-08-05 Thread GitBox


mjsax merged pull request #9047:
URL: https://github.com/apache/kafka/pull/9047


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10365) Cannot delete topic in Windows

2020-08-05 Thread Nilesh Balu Jorwar (Jira)
Nilesh Balu Jorwar created KAFKA-10365:
--

 Summary: Cannot delete topic in Windows
 Key: KAFKA-10365
 URL: https://issues.apache.org/jira/browse/KAFKA-10365
 Project: Kafka
  Issue Type: Bug
Reporter: Nilesh Balu Jorwar


Even after checked out latest kafka source code and built. When ran zookeeper 
and server, deleted the topic created, server fails and gives below error:

*Suppressed: java.nio.file.AccessDeniedException: 
E:\nil\logs\kf-logs\javatechie2-0 -> 
 E:\nil\logs\kf-logs\javatechie2-0.793787296b094bb69b4fa3135584a6f4-delete
at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:911)
... 14 more
[2020-08-05 16:12:30,383] WARN [ReplicaManager broker=0] Stopping serving 
replicas in dir E:\nil\logs\kf-logs (kafka.server.ReplicaManager)
[2020-08-05 16:12:30,515] INFO [ReplicaFetcherManager on broker 0] Removed 
fetcher for partitions Set(javatechie3-0, javatechie-0) 
(kafka.server.ReplicaFetcherManager)
[2020-08-05 16:12:30,517] INFO [ReplicaAlterLogDirsManager on broker 0] Removed 
fetcher for partitions Set(javatechie3-0, javatechie-0) 
(kafka.server.ReplicaAlterLogDirsManager)
[2020-08-05 16:12:30,525] WARN [ReplicaManager broker=0] Broker 0 stopped 
fetcher for partitions javatechie3-0,javatechie-0 and stopped moving logs for 
partitions  because they are in the failed log directory E:\nil\logs\kf-logs. 
(kafka.server.ReplicaManager)
[2020-08-05 16:12:30,527] WARN Stopping serving logs in dir E:\nil\logs\kf-logs 
(kafka.log.LogManager)
[2020-08-05 16:12:30,535] ERROR Shutdown broker because all log dirs in 
E:\nil\logs\kf-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9120: KAFKA-10316: Consider renaming getter method for Interactive Queries

2020-08-05 Thread GitBox


mjsax commented on pull request #9120:
URL: https://github.com/apache/kafka/pull/9120#issuecomment-669529543







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9120: KAFKA-10316: Consider renaming getter method for Interactive Queries

2020-08-05 Thread GitBox


mjsax commented on pull request #9120:
URL: https://github.com/apache/kafka/pull/9120#issuecomment-669530846


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >