[jira] [Commented] (KAFKA-10266) Fix connector configs in docs to mention the correct default value inherited from worker configs

2020-07-30 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167681#comment-17167681
 ] 

Luke Chen commented on KAFKA-10266:
---

[PR: 
https://github.com/apache/kafka/pull/9104|https://github.com/apache/kafka/pull/9104]

 

> Fix connector configs in docs to mention the correct default value inherited 
> from worker configs
> 
>
> Key: KAFKA-10266
> URL: https://issues.apache.org/jira/browse/KAFKA-10266
> Project: Kafka
>  Issue Type: Bug
>Reporter: Konstantine Karantasis
>Assignee: Luke Chen
>Priority: Major
>
>  
> Example: 
> [https://kafka.apache.org/documentation/#header.converter]
> has the correct default when it is mentioned as a worker property. 
> But under the section of source connector configs, it's default value is said 
> to be `null`. 
> Though that is correct in terms of implementation, it's confusing for users. 
> We should surface the correct defaults for configs that inherit (or otherwise 
> override) worker configs. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-30 Thread GitBox


ijuma commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665828744


   What were the throughput numbers? I assume you meant the connsumer perf 
test, not console consumer.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-30 Thread GitBox


abbccdda merged pull request #9081:
URL: https://github.com/apache/kafka/pull/9081


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-30 Thread GitBox


mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665845747


   @ijuma you're right, i meant the consumer perf test. I updated my comment to 
clarify



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-30 Thread GitBox


abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r462453975



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
   Note in the post-KIP-500 world, this feature could still work, but the 
request must be redirected to the controller inherently on the broker side, 
instead of sending it directly. So in the comment, we may try to phrase it to 
convey the principal is that `the request must be handled by the controller` 
instead of `the admin client must send this request to the controller`. 

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,70 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. This operation is not 
transactional so it
+ * may succeed for some features while fail for others.
+ * 
+ * The API takes in a map of finalized feature name to {@link 
FeatureUpdate} that need to be

Review comment:
   nit: s/name/names

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -983,8 +1144,25 @@ class KafkaController(val config: KafkaConfig,
*/
   private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], 
partitions: Set[TopicPartition]): Unit = {
 try {
+  val filteredBrokers = scala.collection.mutable.Set[Int]() ++ brokers
+  if (config.isFeatureVersioningEnabled) {
+def hasIncompatibleFeatures(broker: Broker): Boolean = {
+  val latestFinalizedFeatures = featureCache.get
+  if (latestFinalizedFeatures.isDefined) {
+BrokerFeatures.hasIncompatibleFeatures(broker.features, 
latestFinalizedFeatures.get.features)
+  } else {
+false
+  }
+}
+controllerContext.liveOrShuttingDownBrokers.foreach(broker => {
+  if (filteredBrokers.contains(broker.id) && 
hasIncompatibleFeatures(broker)) {

Review comment:
   I see, what would happen to a currently live broker if it couldn't get 
any metadata update for a while, will it shut down itself?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either exp

[GitHub] [kafka] abbccdda closed pull request #8940: KAFKA-10181: AlterConfig/IncrementalAlterConfig should route to the controller for non validation calls

2020-07-30 Thread GitBox


abbccdda closed pull request #8940:
URL: https://github.com/apache/kafka/pull/8940


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-30 Thread GitBox


hachikuji commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665938546







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9104: KAFKA-10266: Update the connector config header.converter

2020-07-30 Thread GitBox


showuon opened a new pull request #9104:
URL: https://github.com/apache/kafka/pull/9104


   After my update, the original wrong statement will be removed.
   > By default, the SimpleHeaderConverter is used to . 
   
   And it'll be replaced with the following, with hyperlink to the worker 
config's header.converter section.
   >  By default, the value is null and the Connect config will be used.
   
   Also, update the default value to **Inherited from Connect config**
   
   
![圖片](https://user-images.githubusercontent.com/43372967/88890433-337b9d80-d274-11ea-8549-7c487faef823.png)
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-30 Thread GitBox


jsancio commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r462489390



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2090,6 +2093,12 @@ case class ReplicaLeaderElection(
   callback: ElectLeadersCallback = _ => {}
 ) extends ControllerEvent {
   override def state: ControllerState = ControllerState.ManualLeaderBalance
+
+  override def preempt(): Unit = callback(
+partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, 
Either[ApiError, Int]]) { partitions =>

Review comment:
   Yeah. The value returned by the `fold` is passed to `callback`. 
`foreach` would return `Unit`.

##
File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala
##
@@ -140,6 +143,9 @@ class ControllerEventManager(controllerId: Int,
 }
   }
 
+  // for testing
+  private[controller] def setControllerEventThread(thread: 
ControllerEventThread): Unit = this.thread = thread

Review comment:
   We can remove this since we have `private[controller] var thread` with 
the same visibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)

2020-07-30 Thread GitBox


apovzner commented on a change in pull request #9072:
URL: https://github.com/apache/kafka/pull/9072#discussion_r462645570



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+/**
+ * The {@link TokenBucket} is a {@link SampledStat} implementing a token 
bucket that can be used
+ * in conjunction with a {@link Rate} to enforce a quota.
+ *
+ * A token bucket accumulates tokens with a constant rate R, one token per 1/R 
second, up to a
+ * maximum burst size B. The burst size essentially means that we keep 
permission to do a unit of
+ * work for up to B / R seconds.
+ *
+ * The {@link TokenBucket} adapts this to fit within a {@link SampledStat}. It 
accumulates tokens
+ * in chunks of Q units by default (sample length * quota) instead of one 
token every 1/R second
+ * and expires in chunks of Q units too (when the oldest sample is expired).
+ *
+ * Internally, we achieve this behavior by not completing the current sample 
until we fill that
+ * sample up to Q (used all credits). Samples are filled up one after the 
others until the maximum
+ * number of samples is reached. If it is not possible to created a new 
sample, we accumulate in
+ * the last one until a new one can be created. The over used credits are 
spilled over to the new
+ * sample at when it is created. Every time a sample is purged, Q credits are 
made available.
+ *
+ * It is important to note that the maximum burst is not enforced in the class 
and depends on
+ * how the quota is enforced in the {@link Rate}.
+ */
+public class TokenBucket extends SampledStat {
+
+private final TimeUnit unit;
+
+/**
+ * Instantiates a new TokenBucket that works by default with a Quota 
{@link MetricConfig#quota()}
+ * in {@link TimeUnit#SECONDS}.
+ */
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+/**
+ * Instantiates a new TokenBucket that works with the provided time unit.
+ *
+ * @param unit The time unit of the Quota {@link MetricConfig#quota()}
+ */
+public TokenBucket(TimeUnit unit) {
+super(0);
+this.unit = unit;
+}
+
+@Override
+public void record(MetricConfig config, double value, long timeMs) {

Review comment:
   @junrao Regarding "With this change, if we record a large value, the 
observed effect of the value could last much longer than the number of samples. 
" -- This will not happen with this approach. If we record a very large value, 
we never move to the bucket with timestamp > current timestamp (of the 
recording time). This approach can only add the value to older buckets, which 
did not expire, but never to the buckets "in the future". 
   
   For example, if we only have 2 samples, and perSampleQuota  = 5, and say we 
already filled in both buckets up to quota: [5, 5]. If new requests arrive but 
the timestamp did not move past the last bucket, we are going to be adding this 
value to the last bucket, for example getting to [5, 20] if we recorded 15. If 
the next recording happens after the time moved passed the last bucket, say we 
record 3, then buckets will look like [20, 3].

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOU

[GitHub] [kafka] guozhangwang commented on pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup

2020-07-30 Thread GitBox


guozhangwang commented on pull request #9095:
URL: https://github.com/apache/kafka/pull/9095#issuecomment-665967044


   LGTM. Please feel free to merge and cherry-pick.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9104: KAFKA-10266: Update the connector config header.converter

2020-07-30 Thread GitBox


showuon commented on pull request #9104:
URL: https://github.com/apache/kafka/pull/9104#issuecomment-666170759


   @kkonstantine , could you help review this PR to correct the documentation. 
Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup

2020-07-30 Thread GitBox


abbccdda merged pull request #9095:
URL: https://github.com/apache/kafka/pull/9095


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah edited a comment on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-30 Thread GitBox


mumrah edited a comment on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665795947


   I ran the consumer perf test (at @hachikuji's suggestion) and took a 
profile. Throughput was around 500MB/s on trunk and on this branch
   
   
![image](https://user-images.githubusercontent.com/55116/88832229-81be6d00-d19e-11ea-9ee9-51b6054a6731.png)
   
   Zoomed in a bit on the records part:
   
   
![image](https://user-images.githubusercontent.com/55116/88832276-93a01000-d19e-11ea-9293-a138c38f6ed3.png)
   
   This was with only a handful of partitions on a single broker (on my 
laptop), but it confirms that the new FetchResponse serialization is hitting 
the same sendfile path as the previous code.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-30 Thread GitBox


abbccdda merged pull request #9012:
URL: https://github.com/apache/kafka/pull/9012


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8854: KAFKA-10146, KAFKA-9066: Retain metrics for failed tasks (#8502)

2020-07-30 Thread GitBox


kkonstantine commented on pull request #8854:
URL: https://github.com/apache/kafka/pull/8854#issuecomment-665952918







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup

2020-07-30 Thread GitBox


abbccdda commented on pull request #9095:
URL: https://github.com/apache/kafka/pull/9095#issuecomment-665999727







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9103: Add redirection for (Incremental)AlterConfig

2020-07-30 Thread GitBox


abbccdda opened a new pull request #9103:
URL: https://github.com/apache/kafka/pull/9103


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8940: KAFKA-10181: AlterConfig/IncrementalAlterConfig should route to the controller for non validation calls

2020-07-30 Thread GitBox


abbccdda commented on pull request #8940:
URL: https://github.com/apache/kafka/pull/8940#issuecomment-666091752


   Will close this PR as the KIP-590 requirement changes



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-07-30 Thread GitBox


mumrah opened a new pull request #9100:
URL: https://github.com/apache/kafka/pull/9100


   WIP



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)

2020-07-30 Thread GitBox


junrao commented on a change in pull request #9072:
URL: https://github.com/apache/kafka/pull/9072#discussion_r462624834



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+/**
+ * The {@link TokenBucket} is a {@link SampledStat} implementing a token 
bucket that can be used
+ * in conjunction with a {@link Rate} to enforce a quota.
+ *
+ * A token bucket accumulates tokens with a constant rate R, one token per 1/R 
second, up to a
+ * maximum burst size B. The burst size essentially means that we keep 
permission to do a unit of
+ * work for up to B / R seconds.
+ *
+ * The {@link TokenBucket} adapts this to fit within a {@link SampledStat}. It 
accumulates tokens
+ * in chunks of Q units by default (sample length * quota) instead of one 
token every 1/R second
+ * and expires in chunks of Q units too (when the oldest sample is expired).
+ *
+ * Internally, we achieve this behavior by not completing the current sample 
until we fill that
+ * sample up to Q (used all credits). Samples are filled up one after the 
others until the maximum
+ * number of samples is reached. If it is not possible to created a new 
sample, we accumulate in
+ * the last one until a new one can be created. The over used credits are 
spilled over to the new
+ * sample at when it is created. Every time a sample is purged, Q credits are 
made available.
+ *
+ * It is important to note that the maximum burst is not enforced in the class 
and depends on
+ * how the quota is enforced in the {@link Rate}.
+ */
+public class TokenBucket extends SampledStat {
+
+private final TimeUnit unit;
+
+/**
+ * Instantiates a new TokenBucket that works by default with a Quota 
{@link MetricConfig#quota()}
+ * in {@link TimeUnit#SECONDS}.
+ */
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+/**
+ * Instantiates a new TokenBucket that works with the provided time unit.
+ *
+ * @param unit The time unit of the Quota {@link MetricConfig#quota()}
+ */
+public TokenBucket(TimeUnit unit) {
+super(0);
+this.unit = unit;
+}
+
+@Override
+public void record(MetricConfig config, double value, long timeMs) {

Review comment:
   Just a high level comment on the approach. This approach tries to spread 
a recorded value to multiple samples if the sample level quota is exceeded. 
While this matches the token bucket behavior for quota, it changes the behavior 
when we observe the value of the measurable. Currently, we record a full value 
in the current Sample. When we observe the value of the measurable, the effect 
of this value will last for the number of samples. After which, this value 
rolls out and no longer impacts the observed measurable. With this change, if 
we record a large value, the observed effect of the value could last much 
longer than the number of samples. For example, if we have 10 1-sec samples 
each with a quota of 1 and we record a value of 1000, the effect of this value 
will last for 1000 secs instead of 10 secs from the observability perspective. 
This may cause some confusion since we don't quite know when an event actually 
occurred.
   
   I was thinking about an alternative approach that decouples the 
recording/observation of the measurable from quota calculation. Here is the 
rough idea. We create a customized SampledStat that records new values in a 
single sample as it is.  In addition, it maintains an accumulated available 
credit. As time advances, we add new credits based on the quota rate, capped by 
samples * perSampleQuota. When a value is recorded, we deduct the value from 
the credit and allow the credit to go below 0. We change the 
Sensor.checkQuotas() logic such that if the customized SampledStat is used, we 
throw QuotaViolationException if credit is < 0. This preserves the current 
behavior for observability, but 

[GitHub] [kafka] dielhennr opened a new pull request #9101: KAFKA-10325: KIP-649 implementation

2020-07-30 Thread GitBox


dielhennr opened a new pull request #9101:
URL: https://github.com/apache/kafka/pull/9101


   This is the initial implementation of 
[KIP-649](https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=158869615&draftShareId=c349fbe8-7aa8-4fde-a8e4-8d719cda3b9a&;)
 which implements dynamic client configuration and this is still a work in 
progress.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup

2020-07-30 Thread GitBox


guozhangwang commented on a change in pull request #9095:
URL: https://github.com/apache/kafka/pull/9095#discussion_r462589015



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -173,6 +178,18 @@ public boolean stillRunning() {
 }
 }
 
+public boolean inErrorState() {
+synchronized (stateLock) {
+return state.inErrorState();
+}
+}
+
+public boolean stillInitializing() {
+synchronized (stateLock) {
+return !state.isRunning() && !state.inErrorState();

Review comment:
   Why not just `state.CREATED` as we are excluding three out of four 
states here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose

2020-07-30 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167720#comment-17167720
 ] 

Bruno Cadonna commented on KAFKA-7540:
--

https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7618

{code:java}
20:08:12 kafka.api.ConsumerBounceTest > testClose FAILED
20:08:12 java.lang.AssertionError: Assignment did not complete on time
20:08:12 at org.junit.Assert.fail(Assert.java:89)
20:08:12 at org.junit.Assert.assertTrue(Assert.java:42)
20:08:12 at 
kafka.api.ConsumerBounceTest.checkClosedState(ConsumerBounceTest.scala:486)
20:08:12 at 
kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:257)
20:08:12 at 
kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:220)
{code}


> Flaky Test ConsumerBounceTest#testClose
> ---
>
> Key: KAFKA-7540
> URL: https://issues.apache.org/jira/browse/KAFKA-7540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0
>Reporter: John Roesler
>Assignee: Jason Gustafson
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.7.0, 2.6.1
>
>
> Observed on Java 8: 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/]
>  
> Stacktrace:
> {noformat}
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146)
>   at 
> kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238)
>   at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAda

[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication

2020-07-30 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167726#comment-17167726
 ] 

Bruno Cadonna commented on KAFKA-9013:
--

https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3608

{code:java}
java.lang.RuntimeException: Could not find enough records. found 0, expected 100
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435)
at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:221)
{code}


> Flaky Test MirrorConnectorsIntegrationTest#testReplication
> --
>
> Key: KAFKA-9013
> URL: https://issues.apache.org/jira/browse/KAFKA-9013
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {code:java}
> java.lang.AssertionError: Condition not met within timeout 2. Offsets not 
> translated downstream to primary cluster.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354)
>   at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239)
> {code}
> h1. Standard Error
> {code}
> Standard Error
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implemen

[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test

2020-07-30 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167727#comment-17167727
 ] 

Bruno Cadonna commented on KAFKA-10255:
---

https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3608

{code:java}
java.lang.AssertionError: consumer record size is not zero expected:<0> but 
was:<4>
{code}


> Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
> 
>
> Key: KAFKA-10255
> URL: https://issues.apache.org/jira/browse/KAFKA-10255
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 STARTED
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 FAILED
>  java.lang.AssertionError: consumer record size is not zero expected:<0> but 
> was:<2>
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.failNotEquals(Assert.java:835)
>  at org.junit.Assert.assertEquals(Assert.java:647)
>  at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness

2020-07-30 Thread GitBox


cadonna commented on pull request #9087:
URL: https://github.com/apache/kafka/pull/9087#issuecomment-666202514







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness

2020-07-30 Thread GitBox


chia7712 commented on pull request #9087:
URL: https://github.com/apache/kafka/pull/9087#issuecomment-666205937


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)

2020-07-30 Thread GitBox


dajac commented on a change in pull request #9072:
URL: https://github.com/apache/kafka/pull/9072#discussion_r462846252



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+/**
+ * The {@link TokenBucket} is a {@link SampledStat} implementing a token 
bucket that can be used
+ * in conjunction with a {@link Rate} to enforce a quota.
+ *
+ * A token bucket accumulates tokens with a constant rate R, one token per 1/R 
second, up to a
+ * maximum burst size B. The burst size essentially means that we keep 
permission to do a unit of
+ * work for up to B / R seconds.
+ *
+ * The {@link TokenBucket} adapts this to fit within a {@link SampledStat}. It 
accumulates tokens
+ * in chunks of Q units by default (sample length * quota) instead of one 
token every 1/R second
+ * and expires in chunks of Q units too (when the oldest sample is expired).
+ *
+ * Internally, we achieve this behavior by not completing the current sample 
until we fill that
+ * sample up to Q (used all credits). Samples are filled up one after the 
others until the maximum
+ * number of samples is reached. If it is not possible to created a new 
sample, we accumulate in
+ * the last one until a new one can be created. The over used credits are 
spilled over to the new
+ * sample at when it is created. Every time a sample is purged, Q credits are 
made available.
+ *
+ * It is important to note that the maximum burst is not enforced in the class 
and depends on
+ * how the quota is enforced in the {@link Rate}.
+ */
+public class TokenBucket extends SampledStat {
+
+private final TimeUnit unit;
+
+/**
+ * Instantiates a new TokenBucket that works by default with a Quota 
{@link MetricConfig#quota()}
+ * in {@link TimeUnit#SECONDS}.
+ */
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+/**
+ * Instantiates a new TokenBucket that works with the provided time unit.
+ *
+ * @param unit The time unit of the Quota {@link MetricConfig#quota()}
+ */
+public TokenBucket(TimeUnit unit) {
+super(0);
+this.unit = unit;
+}
+
+@Override
+public void record(MetricConfig config, double value, long timeMs) {

Review comment:
   @junrao Thanks for your comment. Your observation is correct. This is 
because we spill over the amount above the quota into any newly created sample. 
So @apovzner's example would actually end up like this: [5, 18]. @apovzner I 
think that you have mentioned once another way to do this that may not require 
to spill over the remainder.
   
   Your remark regarding the observability is right. This is true as soon as we 
deviate from a sampled rate, regardless of the implementation that we may 
choose eventually. Separating concerns is indeed a good idea if we can. 
Regarding your suggested approach, I think that may works. As @apovzner said, 
we will need to change the way the throttle time is computed so that it does 
not rely on the rate but on the amount of credits. Another concern is that the 
credits won't be observable neither so we may keep the correct rate but still 
may not understand why one is throttled or not if we can't observe the amount 
of credits used/left.
   
   If we want to separate concerns, having both a sampled Rate and a Token 
Bucket working side by side would be the best. The Rate would continue to 
provide the actual Rate as of today and the Token Bucket would be used to 
enforced the quota. We could expose the amount of credits in the bucket via a 
new metric. One way to achieve this would be to have two MeasurableStats within 
the Sensor: the Rate and the Token Bucket. We would still need to have an 
adapted version of the Token Bucket that works with our current configs (quota, 
samples, window). I implemented one to compare with the current approach few 
weeks ago: 
https://github.com/dajac/kafka/blob/1

[GitHub] [kafka] mimaison commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

2020-07-30 Thread GitBox


mimaison commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-666239450


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

2020-07-30 Thread GitBox


mimaison commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-666256788


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-30 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r462881933



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   Sorry, I did a mistake here. We should not give new metrics old groups. 
I think to fix this test you need to adapt the filter on line 618 to let all 
metrics with groups that relate to KV state stores pass. See 
`checkWindowStoreAndSuppressionBufferMetrics()` for an example.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-30 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r462882926



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
##
@@ -443,6 +447,25 @@ public static Sensor suppressionBufferSizeSensor(final 
String threadId,
 );
 }
 
+public static Sensor e2ELatencySensor(final String threadId,
+  final String taskId,
+  final String storeType,
+  final String storeName,
+  final StreamsMetricsImpl 
streamsMetrics) {
+final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, 
taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE);
+final Map tagMap = 
streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName);
+addAvgAndMinAndMaxToSensor(
+sensor,
+STATE_STORE_LEVEL_GROUP,

Review comment:
   I just realized that we should not put new metrics into old groups. Your 
code is fine. Do not use `stateStoreLevelGroup()`! Sorry for the confusion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-30 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r462883347



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
##
@@ -327,6 +327,38 @@ public void shouldGetExpiredWindowRecordDropSensor() {
 assertThat(sensor, is(expectedSensor));
 }
 
+@Test
+public void shouldGetRecordE2ELatencySensor() {
+final String metricName = "record-e2e-latency";
+
+final String e2eLatencyDescription =
+"end-to-end latency of a record, measuring by comparing the record 
timestamp with the "
++ "system time when it has been fully processed by the node";
+final String descriptionOfAvg = "The average " + e2eLatencyDescription;
+final String descriptionOfMin = "The minimum " + e2eLatencyDescription;
+final String descriptionOfMax = "The maximum " + e2eLatencyDescription;
+
+expect(streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, STORE_NAME, 
metricName, RecordingLevel.TRACE))
+.andReturn(expectedSensor);
+expect(streamsMetrics.storeLevelTagMap(THREAD_ID, TASK_ID, STORE_TYPE, 
STORE_NAME)).andReturn(storeTagMap);
+StreamsMetricsImpl.addAvgAndMinAndMaxToSensor(
+expectedSensor,
+STORE_LEVEL_GROUP,

Review comment:
   I just realized that we should not put new metrics into old groups. Your 
code is fine. Do not use instance variable `storeLevelGroup`! Sorry for the 
confusion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-30 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-666310163


   @dajac Ok, I've removed the `Optional` to `Option` changes



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness

2020-07-30 Thread GitBox


cadonna commented on pull request #9087:
URL: https://github.com/apache/kafka/pull/9087#issuecomment-666311943


   Java 14 failed due to the following failing test:
   
   ```
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > 
shouldUpgradeFromEosAlphaToEosBeta[true]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10327) Make flush after some count of putted records in SinkTask

2020-07-30 Thread Pavel Kuznetsov (Jira)
Pavel Kuznetsov created KAFKA-10327:
---

 Summary: Make flush after some count of putted records in SinkTask
 Key: KAFKA-10327
 URL: https://issues.apache.org/jira/browse/KAFKA-10327
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.5.0
Reporter: Pavel Kuznetsov


In current version of kafka connect all records accumulated with SinkTask.put 
method are flushed to target system on a time-based manner. So data is flushed 
and offsets are committed every  offset.flush.timeout.ms (default is 6) ms.

But you can't control the number of messages you receive from Kafka between two 
flushes. It may cause out of memory errors, because in-memory buffer may grow a 
lot. 

I suggest to add out of box support of count-based flush to kafka connect. It 
requires new configuration parameter (offset.flush.count, for example). Number 
of records sent to SinkTask.put should be counted, and if these amount is 
greater than offset.flush.count's value, SinkTask.flush is called and offsets 
are committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness

2020-07-30 Thread GitBox


chia7712 commented on pull request #9087:
URL: https://github.com/apache/kafka/pull/9087#issuecomment-666325276


   ```EosBetaUpgradeIntegrationTest``` pass on my local. retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…

2020-07-30 Thread GitBox


chia7712 commented on pull request #9102:
URL: https://github.com/apache/kafka/pull/9102#issuecomment-666325552


   ```EosBetaUpgradeIntegrationTest``` is flaky... retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

2020-07-30 Thread GitBox


mimaison commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-666353178


   Failures look unrelated:
   - org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > 
shouldUpgradeFromEosAlphaToEosBeta[true] FAILED
   - org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
testOneWayReplicationWithAutorOffsetSync1



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

2020-07-30 Thread GitBox


mimaison merged pull request #9007:
URL: https://github.com/apache/kafka/pull/9007


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10120) DescribeLogDirsResult exposes internal classes

2020-07-30 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-10120.

Fix Version/s: 2.7
   Resolution: Fixed

> DescribeLogDirsResult exposes internal classes
> --
>
> Key: KAFKA-10120
> URL: https://issues.apache.org/jira/browse/KAFKA-10120
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
> Fix For: 2.7
>
>
> DescribeLogDirsResult (returned by AdminClient#describeLogDirs(Collection)) 
> exposes a number of internal types:
>  * {{DescribeLogDirsResponse.LogDirInfo}}
>  * {{DescribeLogDirsResponse.ReplicaInfo}}
>  * {{Errors}}
> {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-30 Thread GitBox


mimaison commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r462989546



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -345,15 +342,24 @@ private void 
waitForConsumerGroupOffsetSync(Consumer consumer, L
 }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
 }
 
+private void waitForConsumingAllRecords(Consumer consumer) 
throws InterruptedException {
+final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+waitForCondition(() -> {
+ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+consumer.commitSync();
+return NUM_RECORDS_PRODUCED == 
totalConsumedRecords.addAndGet(records.count());
+}, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the 
records in time");
+}
+
 @Test
 public void testOneWayReplicationWithAutoOffsetSync() throws 
InterruptedException {
 
 // create consumers before starting the connectors so we don't need to 
wait for discovery
-Consumer consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"group.id", "consumer-group-1"), "test-topic-1");
-consumer1.poll(Duration.ofMillis(500));
-consumer1.commitSync();
-consumer1.close();
+try (Consumer consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+"group.id", "consumer-group-1"), "test-topic-1")) {
+// we need to wait for consuming all the records for MM2 
replicaing the expected offsets

Review comment:
   `replicaing` -> `replicating`

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -345,15 +342,24 @@ private void 
waitForConsumerGroupOffsetSync(Consumer consumer, L
 }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
 }
 
+private void waitForConsumingAllRecords(Consumer consumer) 
throws InterruptedException {
+final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+waitForCondition(() -> {
+ConsumerRecords records = consumer.poll(Duration.ofMillis(500));

Review comment:
   Can we add the types `` to `ConsumerRecords`?

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -387,11 +393,11 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
 }
 
 // create a consumer at primary cluster to consume the new topic
-consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"group.id", "consumer-group-1"), "test-topic-2");
-consumer1.poll(Duration.ofMillis(500));
-consumer1.commitSync();
-consumer1.close();
+try (Consumer consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+"group.id", "consumer-group-1"), "test-topic-2")) {
+// we need to wait for consuming all the records for MM2 
replicaing the expected offsets

Review comment:
   `replicaing` -> `replicating`

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -345,15 +342,24 @@ private void 
waitForConsumerGroupOffsetSync(Consumer consumer, L
 }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
 }
 
+private void waitForConsumingAllRecords(Consumer consumer) 
throws InterruptedException {
+final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+waitForCondition(() -> {
+ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+consumer.commitSync();

Review comment:
   We can move that line after the `waitForCondition()` block to just 
commit once all records have been consumed.

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -345,15 +342,24 @@ private void 
waitForConsumerGroupOffsetSync(Consumer consumer, L
 }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
 }
 
+private void waitForConsumingAllRecords(Consumer consumer) 
throws InterruptedException {
+final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+waitForCondition(() -> {
+ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+consumer.commitSync();
+return NUM_RECORDS_PRODUCED == 
totalConsumedRecords.addAndGet(records.count());
+}, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the 
records in time");

Review comment:
   nit: The 

[jira] [Assigned] (KAFKA-10048) Possible data gap for a consumer after a failover when using MM2

2020-07-30 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-10048:
--

Assignee: Andre Araujo

> Possible data gap for a consumer after a failover when using MM2
> 
>
> Key: KAFKA-10048
> URL: https://issues.apache.org/jira/browse/KAFKA-10048
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Andre Araujo
>Assignee: Andre Araujo
>Priority: Major
>
> I've been looking at some MM2 scenarios and identified a situation where 
> consumers can miss consuming some data in the even of a failover.
>  
> When a consumer subscribes to a topic for the first time and commits offsets, 
> the offsets for every existing partition of that topic will be saved to the 
> cluster's {{__consumer_offset}} topic. Even if a partition is completely 
> empty, the offset {{0}} will still be saved for the consumer's consumer group.
>  
> When MM2 is replicating the checkpoints to the remote cluster, though, it 
> [ignores anything that has an offset equals to 
> zero|https://github.com/apache/kafka/blob/856e36651203b03bf9a6df2f2d85a356644cbce3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L135],
>  replicating offsets only for partitions that contain data.
>  
> This can lead to a gap in the data consumed by consumers in the following 
> scenario:
>  # Topic is created on the source cluster.
>  # MM2 is configured to replicate the topic and consumer groups
>  # Producer starts to produce data to the source topic but for some reason 
> some partitions do not get data initially, while others do (skewed keyed 
> messages or bad luck)
>  # Consumers start to consume data from that topic and their consumer groups' 
> offsets are replicated to the target cluster, *but only for partitions that 
> contain data*. The consumers are using the default setting auto.offset.reset 
> = latest.
>  # A consumer failover to the second cluster is performed (for whatever 
> reason), and the offset translation steps are completed. The consumer are not 
> restarted yet.
>  # The producers continue to produce data to the source cluster topic and now 
> produce data to the partitions that were empty before.
>  # *After* the producers start producing data, consumers are started on the 
> target cluster and start consuming.
> For the partitions that already had data before the failover, everything 
> works fine. The consumer offsets will have been translated correctly and the 
> consumers will start consuming from the correct position.
> For the partitions that were empty before the failover, though, any data 
> written by the producers to those partitions *after the failover but before 
> the consumers start* will be completely missed, since the consumers will jump 
> straight to the latest offset when they start due to the lack of a zero 
> offset stored locally on the target cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10048) Possible data gap for a consumer after a failover when using MM2

2020-07-30 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-10048:
--

Assignee: (was: Mickael Maison)

> Possible data gap for a consumer after a failover when using MM2
> 
>
> Key: KAFKA-10048
> URL: https://issues.apache.org/jira/browse/KAFKA-10048
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Andre Araujo
>Priority: Major
>
> I've been looking at some MM2 scenarios and identified a situation where 
> consumers can miss consuming some data in the even of a failover.
>  
> When a consumer subscribes to a topic for the first time and commits offsets, 
> the offsets for every existing partition of that topic will be saved to the 
> cluster's {{__consumer_offset}} topic. Even if a partition is completely 
> empty, the offset {{0}} will still be saved for the consumer's consumer group.
>  
> When MM2 is replicating the checkpoints to the remote cluster, though, it 
> [ignores anything that has an offset equals to 
> zero|https://github.com/apache/kafka/blob/856e36651203b03bf9a6df2f2d85a356644cbce3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L135],
>  replicating offsets only for partitions that contain data.
>  
> This can lead to a gap in the data consumed by consumers in the following 
> scenario:
>  # Topic is created on the source cluster.
>  # MM2 is configured to replicate the topic and consumer groups
>  # Producer starts to produce data to the source topic but for some reason 
> some partitions do not get data initially, while others do (skewed keyed 
> messages or bad luck)
>  # Consumers start to consume data from that topic and their consumer groups' 
> offsets are replicated to the target cluster, *but only for partitions that 
> contain data*. The consumers are using the default setting auto.offset.reset 
> = latest.
>  # A consumer failover to the second cluster is performed (for whatever 
> reason), and the offset translation steps are completed. The consumer are not 
> restarted yet.
>  # The producers continue to produce data to the source cluster topic and now 
> produce data to the partitions that were empty before.
>  # *After* the producers start producing data, consumers are started on the 
> target cluster and start consuming.
> For the partitions that already had data before the failover, everything 
> works fine. The consumer offsets will have been translated correctly and the 
> consumers will start consuming from the correct position.
> For the partitions that were empty before the failover, though, any data 
> written by the producers to those partitions *after the failover but before 
> the consumers start* will be completely missed, since the consumers will jump 
> straight to the latest offset when they start due to the lack of a zero 
> offset stored locally on the target cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10048) Possible data gap for a consumer after a failover when using MM2

2020-07-30 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-10048:
--

Assignee: Mickael Maison

> Possible data gap for a consumer after a failover when using MM2
> 
>
> Key: KAFKA-10048
> URL: https://issues.apache.org/jira/browse/KAFKA-10048
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Andre Araujo
>Assignee: Mickael Maison
>Priority: Major
>
> I've been looking at some MM2 scenarios and identified a situation where 
> consumers can miss consuming some data in the even of a failover.
>  
> When a consumer subscribes to a topic for the first time and commits offsets, 
> the offsets for every existing partition of that topic will be saved to the 
> cluster's {{__consumer_offset}} topic. Even if a partition is completely 
> empty, the offset {{0}} will still be saved for the consumer's consumer group.
>  
> When MM2 is replicating the checkpoints to the remote cluster, though, it 
> [ignores anything that has an offset equals to 
> zero|https://github.com/apache/kafka/blob/856e36651203b03bf9a6df2f2d85a356644cbce3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L135],
>  replicating offsets only for partitions that contain data.
>  
> This can lead to a gap in the data consumed by consumers in the following 
> scenario:
>  # Topic is created on the source cluster.
>  # MM2 is configured to replicate the topic and consumer groups
>  # Producer starts to produce data to the source topic but for some reason 
> some partitions do not get data initially, while others do (skewed keyed 
> messages or bad luck)
>  # Consumers start to consume data from that topic and their consumer groups' 
> offsets are replicated to the target cluster, *but only for partitions that 
> contain data*. The consumers are using the default setting auto.offset.reset 
> = latest.
>  # A consumer failover to the second cluster is performed (for whatever 
> reason), and the offset translation steps are completed. The consumer are not 
> restarted yet.
>  # The producers continue to produce data to the source cluster topic and now 
> produce data to the partitions that were empty before.
>  # *After* the producers start producing data, consumers are started on the 
> target cluster and start consuming.
> For the partitions that already had data before the failover, everything 
> works fine. The consumer offsets will have been translated correctly and the 
> consumers will start consuming from the correct position.
> For the partitions that were empty before the failover, though, any data 
> written by the producers to those partitions *after the failover but before 
> the consumers start* will be completely missed, since the consumers will jump 
> straight to the latest offset when they start due to the lack of a zero 
> offset stored locally on the target cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-30 Thread GitBox


mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-666374043


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463026843



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##
@@ -132,16 +135,19 @@

 final boolean stateCreated,

 final StoreBuilder storeBuilder,

 final Windows windows,
+   
 final SlidingWindows slidingWindows,

 final SessionWindows sessionWindows,

 final Merger sessionMerger) {
 
 final ProcessorSupplier kStreamAggregate;
 
-if (windows == null && sessionWindows == null) {
+if (windows == null && slidingWindows == null && sessionWindows == 
null) {
 kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), 
initializer, aggregator);
-} else if (windows != null && sessionWindows == null) {
+} else if (windows != null && slidingWindows == null && sessionWindows 
== null) {
 kStreamAggregate = new KStreamWindowAggregate<>(windows, 
storeBuilder.name(), initializer, aggregator);
-} else if (windows == null && sessionMerger != null) {
+} else if (windows == null && slidingWindows != null && sessionWindows 
== null) {
+kStreamAggregate = new 
KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), 
initializer, aggregator);
+} else if (windows == null && slidingWindows == null && sessionMerger 
!= null) {

Review comment:
   The original just had the check for `sessionMerger != null`, are there 
scenarios where the sessionMerger would be null but the sessionWindows 
wouldn't? I did think it was kind of inconsistent to check 'sessionMerger' just 
that one time and check 'sessionWindows' the other times so maybe it was a 
mistake





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-30 Thread GitBox


showuon commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r463028636



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -345,15 +342,24 @@ private void 
waitForConsumerGroupOffsetSync(Consumer consumer, L
 }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
 }
 
+private void waitForConsumingAllRecords(Consumer consumer) 
throws InterruptedException {
+final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+waitForCondition(() -> {
+ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+consumer.commitSync();

Review comment:
   Good suggestion! Thanks.

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -345,15 +342,24 @@ private void 
waitForConsumerGroupOffsetSync(Consumer consumer, L
 }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
 }
 
+private void waitForConsumingAllRecords(Consumer consumer) 
throws InterruptedException {
+final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+waitForCondition(() -> {
+ConsumerRecords records = consumer.poll(Duration.ofMillis(500));

Review comment:
   Done. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-30 Thread GitBox


showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-666395292


   @mimaison , thanks for your good suggestion! I've updated in this commit: 
https://github.com/apache/kafka/pull/9029/commits/8bc4a543dda6ddd90d752f7e6a64c63d85a1de3f.
 Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463036048



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {

Review comment:
   done!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-30 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-666398632


   > 2 system test failures in the latest PR
   
   those 2 failed tests are flaky on my local and there are issue/PR related to 
them.
   
   @junrao @ijuma @hachikuji @rajinisivaram please take a look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463039985



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl extends AbstractStream 
implements TimeWindowedKStream {
+private final SlidingWindows windows;
+private final GroupedStreamAggregateBuilder aggregateBuilder;
+
+SlidingWindowedKStreamImpl(final SlidingWindows windows,
+final InternalStreamsBuilder builder,
+final Set subTopologySourceNodes,
+final String name,
+final Serde keySerde,
+final Serde valueSerde,
+final GroupedStreamAggregateBuilder 
aggregateBuilder,
+final StreamsGraphNode streamsGraphNode) {
+super(name, keySerde, valueSerde, subTopologySourceNodes, 
streamsGraphNode, builder);
+this.windows = Objects.requireNonNull(windows, "windows can't be 
null");
+this.aggregateBuilder = aggregateBuilder;
+}
+
+@Override
+public KTable, Long> count() {
+return count(NamedInternal.empty());
+}
+
+@Override
+public KTable, Long> count(final Named named) {
+return doCount(named, Materialized.with(keySerde, Serdes.Long()));
+}
+
+
+@Override
+public KTable, Long> count(final Materialized> materialized) {
+return count(NamedInternal.empty(), materialized);
+}
+
+@Override
+public KTable, Long> count(final Named named, final 
Materialized> materialized) {
+Objects.requireNonNull(materialized, "materialized can't be null");
+
+// TODO: remove this when we do a topology-incompatible release
+// we used to burn a topology name here, so we have to keep doing it 
for compatibility
+if (new MaterializedInternal<>(materialized).storeName() == null) {
+builder.newStoreName(AGGREGATE_NAME);
+}
+
+return doCount(named, materialized);
+}
+
+private KTable, Long> doCount(final Named named,
+  final Materialized> materialized) {
+final MaterializedInternal> 
materializedInternal =
+new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+
+if (materializedInternal.keySerde() == null) {
+materializedInternal.withKeySerde(keySerde);
+}
+if (materializedInternal.valueSerde() == null) {
+materializedInternal.withValueSerde(Serdes.Long());
+}
+
+final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+return aggregateBu

[GitHub] [kafka] dajac commented on pull request #9091: MINOR; Make KafkaAdminClientTest.testDescribeLogDirsPartialFailure and KafkaAdminClientTest.testAlterReplicaLogDirsPartialFailure test more reli

2020-07-30 Thread GitBox


dajac commented on pull request #9091:
URL: https://github.com/apache/kafka/pull/9091#issuecomment-666402566


   rebased to fix conflicts.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-30 Thread GitBox


ijuma commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-666403581


   @junrao Thanks for the summary. With regards to the thread pool option, 
would this be used for completion of delayed operations for the group 
coordinator only? When it comes to tuning and monitoring, you're thinking we'd 
have to introduce a config for the number of threads and an `idle` metric?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-30 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r463044371



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -239,9 +239,13 @@ class GroupMetadataManager(brokerId: Int,
 }
   }
 
+  /**
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   * incremented. The caller ought to complete delayed requests for 
those returned partitions.
+   */
   def storeGroup(group: GroupMetadata,
  groupAssignment: Map[String, Array[Byte]],
- responseCallback: Errors => Unit): Unit = {
+ responseCallback: Errors => Unit): Map[TopicPartition, 
LeaderHwChange] = {

Review comment:
   I raised the point before that it's a bit unusual and unintuitive to 
have both a callback and a return value. Any thoughts on this?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -369,6 +370,32 @@ class GroupCoordinator(val brokerId: Int,
 }
   }
 
+  /**
+   * try to complete produce, fetch and delete requests if the HW of partition 
is incremented. Otherwise, we try to complete
+   * only delayed fetch requests.
+   *
+   * Noted that this method may hold a lot of group lock so the caller should 
NOT hold any group lock
+   * in order to avoid deadlock
+   * @param topicPartitions a map contains the partition and a flag indicting 
whether the HWM has been changed
+   */
+  private[group] def completeDelayedRequests(topicPartitions: 
Map[TopicPartition, LeaderHwChange]): Unit =
+topicPartitions.foreach {
+  case (tp, leaderHWIncremented) => leaderHWIncremented match {
+case LeaderHwIncremented$ => 
groupManager.replicaManager.completeDelayedRequests(tp)

Review comment:
   I notice that we are including the `$` here and in a few other places, 
we should not do that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463046306



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463047237



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463056420



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] dajac commented on a change in pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long

2020-07-30 Thread GitBox


dajac commented on a change in pull request #9092:
URL: https://github.com/apache/kafka/pull/9092#discussion_r463056307



##
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##
@@ -103,7 +103,7 @@ object DynamicConfig {
   .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, 
MEDIUM, ProducerOverrideDoc)
   .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, 
MEDIUM, ConsumerOverrideDoc)
   .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, 
MEDIUM, RequestOverrideDoc)
-  .define(ControllerMutationOverrideProp, LONG, DefaultConsumerOverride, 
MEDIUM, ControllerMutationOverrideDoc)
+  .define(ControllerMutationOverrideProp, DOUBLE, DefaultConsumerOverride, 
MEDIUM, ControllerMutationOverrideDoc)

Review comment:
   good point. i have changed this to use `Int.MaxValue.toDouble` by 
default like we do for the request quota.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9097: KAFKA-10319: Skip unknown offsets when computing sum of changelog offsets

2020-07-30 Thread GitBox


vvcephei merged pull request #9097:
URL: https://github.com/apache/kafka/pull/9097


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long

2020-07-30 Thread GitBox


rajinisivaram commented on pull request #9092:
URL: https://github.com/apache/kafka/pull/9092#issuecomment-666428695


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long

2020-07-30 Thread GitBox


rajinisivaram commented on pull request #9092:
URL: https://github.com/apache/kafka/pull/9092#issuecomment-666429262


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463063406



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-30 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r463066191



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -239,9 +239,13 @@ class GroupMetadataManager(brokerId: Int,
 }
   }
 
+  /**
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   * incremented. The caller ought to complete delayed requests for 
those returned partitions.
+   */
   def storeGroup(group: GroupMetadata,
  groupAssignment: Map[String, Array[Byte]],
- responseCallback: Errors => Unit): Unit = {
+ responseCallback: Errors => Unit): Map[TopicPartition, 
LeaderHwChange] = {

Review comment:
   The response was 
https://github.com/apache/kafka/pull/8657#discussion_r452754165
   
   In short, we should have a way of fetching delayed request from partition 
instead of using return value to carry them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-30 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r463066323



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -369,6 +370,32 @@ class GroupCoordinator(val brokerId: Int,
 }
   }
 
+  /**
+   * try to complete produce, fetch and delete requests if the HW of partition 
is incremented. Otherwise, we try to complete
+   * only delayed fetch requests.
+   *
+   * Noted that this method may hold a lot of group lock so the caller should 
NOT hold any group lock
+   * in order to avoid deadlock
+   * @param topicPartitions a map contains the partition and a flag indicting 
whether the HWM has been changed
+   */
+  private[group] def completeDelayedRequests(topicPartitions: 
Map[TopicPartition, LeaderHwChange]): Unit =
+topicPartitions.foreach {
+  case (tp, leaderHWIncremented) => leaderHWIncremented match {
+case LeaderHwIncremented$ => 
groupManager.replicaManager.completeDelayedRequests(tp)

Review comment:
   will copy that!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463073842



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463076565



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463077196



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463078058



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463083346



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463083821



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463088582



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463088582



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463091437



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463092579



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[jira] [Commented] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState

2020-07-30 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168029#comment-17168029
 ] 

highluck commented on KAFKA-10283:
--

[~guozhang] 
Can I pick this issue?

Is there any structure you think of?

> Consolidate client-level and consumer-level assignment within ClientState
> -
>
> Key: KAFKA-10283
> URL: https://issues.apache.org/jira/browse/KAFKA-10283
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> In StreamsPartitionAssignor, we do a two-level assignment, one on the 
> client-level, and then after the assignment is done we further decide within 
> the client how to distributed among consumers if there are more.
> The {{ClientState}} class is used for book-keeping the assigned tasks, 
> however it is only used for the first level, while for the second level it is 
> done outside of the class and we only keep track of the results in a few maps 
> for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. 
> some on the client level and some on the consumer level.
> We would like to consolidate some of these maps into a single data structure 
> for better keeping track of the assignment information, and also for less bug 
> vulnerability causing the assignment information to be inconsistent. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-30 Thread GitBox


mimaison commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-666512651


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-07-30 Thread GitBox


vvcephei commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463128398



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) {
 throw new ProcessorStateException(fatal);
 }
 
-// Setup metrics before the database is opened, otherwise the metrics 
are not updated
+// Setup statistics before the database is opened, otherwise the 
statistics are not updated
 // with the measurements from Rocks DB
-maybeSetUpMetricsRecorder(configs);
+maybeSetUpStatistics(configs);
 
 openRocksDB(dbOptions, columnFamilyOptions);
 open = true;
+
+addValueProvidersToMetricsRecorder(configs);
 }
 
-private void maybeSetUpMetricsRecorder(final Map configs) {
-if (userSpecifiedOptions.statistics() == null &&
+private void maybeSetUpStatistics(final Map configs) {
+if (userSpecifiedOptions.statistics() != null) {
+userSpecifiedStatistics = true;
+}
+if (!userSpecifiedStatistics &&
 RecordingLevel.forName((String) 
configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
 
-isStatisticsRegistered = true;
 // metrics recorder will clean up statistics object
 final Statistics statistics = new Statistics();
 userSpecifiedOptions.setStatistics(statistics);
-metricsRecorder.addStatistics(name, statistics);
+}
+}
+
+private void addValueProvidersToMetricsRecorder(final Map 
configs) {
+final TableFormatConfig tableFormatConfig = 
userSpecifiedOptions.tableFormatConfig();
+final Statistics statistics = userSpecifiedStatistics ? null : 
userSpecifiedOptions.statistics();
+if (tableFormatConfig instanceof 
BlockBasedTableConfigWithAccessibleCache) {
+final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) 
tableFormatConfig).blockCache();
+metricsRecorder.addValueProviders(name, db, cache, statistics);
+} else {
+metricsRecorder.addValueProviders(name, db, null, statistics);
+log.warn("A table format configuration is used that does not 
expose the block cache. This means " +
+"that metrics that relate to the block cache may be wrong if 
the block cache is shared.");
 }

Review comment:
   Ah, after reading your test, I now see the issue. I'd overlooked the 
fact that users would independently construct the table config object AND the 
cache. I see now that this makes it impossible to reliably capture the cache, 
since users have to actually choose to pass our special table config to the 
Options and then pass the Cache to that table config.
   
   This doesn't seem ideal. What do you think about just using reflection 
instead?
   
   ```suggestion
   if (tableFormatConfig instanceof BlockBasedTableConfig) {
   final BlockBasedTableConfig blockBasedTableConfig = 
(BlockBasedTableConfig) tableFormatConfig;
   try {
   final Field blockCacheField = 
BlockBasedTableConfig.class.getDeclaredField("blockCache_");
   blockCacheField.setAccessible(true);
   final Cache nullableBlockCache = (Cache) 
blockCacheField.get(blockBasedTableConfig);
   metricsRecorder.addValueProviders(name, db, 
nullableBlockCache, statistics);
   } catch (final NoSuchFieldException | IllegalAccessException | 
ClassCastException e) {
   log.warn("Expected to find and access field 'blockCache_' in 
BlockBasedTableConfig. " +
"Probably, an incompatible version of RocksDB 
is being used. " +
"Cache will be missing from memory metrics.", 
e);
   }
   } else {
   metricsRecorder.addValueProviders(name, db, null, statistics);
   }
   ```
   
   We would obviously test all the branches here to de-risk the reflection. We 
can also add a test that searches the classpath for implementations of 
TableFormatConfig to ensure we don't miss the memo if RocksDB adds a new 
TableFormatConfig implementation.
   
   Alternative thought, if you don't like the reflection: We would _also_ 
subclass Options and override `org.rocksdb.Options#setTableFormatConfig` to 
check if the passed `TableFormatConfig` is a `BlockBasedTableConfig`, and if 
so, then _we_ wrap it with `BlockBasedTableConfigWithAccessibleCache`.

##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##
@@ -227,7 +228,8 @@ public MockProcessorContext(final Properties config, final 
TaskId taskId, final
 this.metrics = new StreamsMetricsImpl(
 new Metrics(metricConfig),
 thre

[GitHub] [kafka] vvcephei commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-07-30 Thread GitBox


vvcephei commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463128398



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) {
 throw new ProcessorStateException(fatal);
 }
 
-// Setup metrics before the database is opened, otherwise the metrics 
are not updated
+// Setup statistics before the database is opened, otherwise the 
statistics are not updated
 // with the measurements from Rocks DB
-maybeSetUpMetricsRecorder(configs);
+maybeSetUpStatistics(configs);
 
 openRocksDB(dbOptions, columnFamilyOptions);
 open = true;
+
+addValueProvidersToMetricsRecorder(configs);
 }
 
-private void maybeSetUpMetricsRecorder(final Map configs) {
-if (userSpecifiedOptions.statistics() == null &&
+private void maybeSetUpStatistics(final Map configs) {
+if (userSpecifiedOptions.statistics() != null) {
+userSpecifiedStatistics = true;
+}
+if (!userSpecifiedStatistics &&
 RecordingLevel.forName((String) 
configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
 
-isStatisticsRegistered = true;
 // metrics recorder will clean up statistics object
 final Statistics statistics = new Statistics();
 userSpecifiedOptions.setStatistics(statistics);
-metricsRecorder.addStatistics(name, statistics);
+}
+}
+
+private void addValueProvidersToMetricsRecorder(final Map 
configs) {
+final TableFormatConfig tableFormatConfig = 
userSpecifiedOptions.tableFormatConfig();
+final Statistics statistics = userSpecifiedStatistics ? null : 
userSpecifiedOptions.statistics();
+if (tableFormatConfig instanceof 
BlockBasedTableConfigWithAccessibleCache) {
+final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) 
tableFormatConfig).blockCache();
+metricsRecorder.addValueProviders(name, db, cache, statistics);
+} else {
+metricsRecorder.addValueProviders(name, db, null, statistics);
+log.warn("A table format configuration is used that does not 
expose the block cache. This means " +
+"that metrics that relate to the block cache may be wrong if 
the block cache is shared.");
 }

Review comment:
   Ah, after reading your test, I now see the issue. I'd overlooked the 
fact that users would independently construct the table config object AND the 
cache. I see now that this makes it impossible to reliably capture the cache, 
since users have to actually choose to pass our special table config to the 
Options and then pass the Cache to that table config.
   
   This doesn't seem ideal. What do you think about just using reflection 
instead?
   
   ```suggestion
   if (tableFormatConfig instanceof BlockBasedTableConfig) {
   final BlockBasedTableConfig blockBasedTableConfig = 
(BlockBasedTableConfig) tableFormatConfig;
   try {
   final Field blockCacheField = 
BlockBasedTableConfig.class.getDeclaredField("blockCache_");
   blockCacheField.setAccessible(true);
   final Cache nullableBlockCache = (Cache) 
blockCacheField.get(blockBasedTableConfig);
   metricsRecorder.addValueProviders(name, db, 
nullableBlockCache, statistics);
   } catch (final NoSuchFieldException | IllegalAccessException | 
ClassCastException e) {
   log.warn("Expected to find and access field 'blockCache_' in 
BlockBasedTableConfig. " +
"Probably, an incompatible version of RocksDB 
is being used. " +
"Cache will be missing from memory metrics.", 
e);
   metricsRecorder.addValueProviders(name, db, null, 
statistics);
   }
   } else {
   metricsRecorder.addValueProviders(name, db, null, statistics);
   }
   ```
   
   We would obviously test all the branches here to de-risk the reflection. We 
can also add a test that searches the classpath for implementations of 
TableFormatConfig to ensure we don't miss the memo if RocksDB adds a new 
TableFormatConfig implementation.
   
   Alternative thought, if you don't like the reflection: We would _also_ 
subclass Options and override `org.rocksdb.Options#setTableFormatConfig` to 
check if the passed `TableFormatConfig` is a `BlockBasedTableConfig`, and if 
so, then _we_ wrap it with `BlockBasedTableConfigWithAccessibleCache`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about th

[GitHub] [kafka] mimaison commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-07-30 Thread GitBox


mimaison commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r463097918



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -367,14 +406,37 @@ public void testOneWayReplicationWithAutorOffsetSync1() 
throws InterruptedExcept
 time.sleep(5000);
 
 // create a consumer at backup cluster with same consumer group Id to 
consume old and new topic
-consumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"group.id", "consumer-group-1"), "primary.test-topic-1", 
"primary.test-topic-2");
+consumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, 
"primary.test-topic-1", "primary.test-topic-2");
 
 records = consumer.poll(Duration.ofMillis(500));
 // similar reasoning as above, no more records to consume by the same 
consumer group at backup cluster
 assertEquals("consumer record size is not zero", 0, records.count());
 consumer.close();
+}
+
+private void produceMessages(EmbeddedConnectCluster cluster, String 
topicName, int partitions, String msgPrefix) {
+for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+// produce to all partitions but the last one

Review comment:
   This comment needs updating

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -190,24 +211,19 @@ public void close() {
 public void testReplication() throws InterruptedException {
 
 // create consumers before starting the connectors so we don't need to 
wait for discovery
-Consumer consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"group.id", "consumer-group-1"), "test-topic-1", 
"backup.test-topic-1");
+Consumer consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", 
"backup.test-topic-1");
 consumer1.poll(Duration.ofMillis(500));
 consumer1.commitSync();
 consumer1.close();
 
-Consumer consumer2 = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"group.id", "consumer-group-1"), "test-topic-1", 
"primary.test-topic-1");
+Consumer consumer2 = 
backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", 
"primary.test-topic-1");

Review comment:
   Do we still need these 2 blocks? In `setup()` we already consumed all 
messages

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -128,10 +136,23 @@ public void setup() throws InterruptedException {
 backup.kafka().createTopic("primary.test-topic-1", 1);
 backup.kafka().createTopic("heartbeats", 1);
 
-for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-1-" + i);
-backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-2-" + i);
-}
+// produce to all partitions but the last one

Review comment:
   Would it be better using a separate topic in order to keep a partition 
without any records? By changing this topic it affects existing checks in all 
tests

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -128,10 +136,23 @@ public void setup() throws InterruptedException {
 backup.kafka().createTopic("primary.test-topic-1", 1);
 backup.kafka().createTopic("heartbeats", 1);
 
-for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-1-" + i);
-backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-2-" + i);
-}
+// produce to all partitions but the last one
+produceMessages(primary, "test-topic-1", NUM_PARTITIONS - 1, 
"message-1-");
+produceMessages(backup, "test-topic-1", NUM_PARTITIONS - 1, 
"message-2-");
+
+consumerProps = new HashMap() {{

Review comment:
   As this does not change, I wonder if we could direct initialize 
`consumerProps` when it's declared

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -244,26 +251,50 @@ public void testReplication() throws InterruptedException 
{
 
 assertTrue("Offsets not translated downstream to backup cluster. 
Found: " + backupOffsets, backupOffsets.containsKey(
 new TopicPartition("primary.test-topic-1", 0)));
+assertTrue("Offset of empty partition not translated downstream to 
backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+new TopicPartition("p

[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-30 Thread GitBox


mimaison commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-666526465


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-30 Thread GitBox


mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-666531948


   These test failures are known flaky tests which already have jira tickets



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck opened a new pull request #9105: MINOR: closable object Memory leak prevention

2020-07-30 Thread GitBox


highluck opened a new pull request #9105:
URL: https://github.com/apache/kafka/pull/9105


   closable object Memory leak prevention
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah merged pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-30 Thread GitBox


mumrah merged pull request #9008:
URL: https://github.com/apache/kafka/pull/9008


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState

2020-07-30 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168100#comment-17168100
 ] 

Guozhang Wang commented on KAFKA-10283:
---

Sure [~high.lee]. I'd suggest you get familiar with the related classes of task 
assignment, StreamsPartitionAssignor, HighAvailabilityAssignor, especially the 
two-phase logic of 1) first assign across clients, and then 2) within each 
client assign across its consumers. And then you can propose a way to 
consolidate the data structures usage for these two phases. 

> Consolidate client-level and consumer-level assignment within ClientState
> -
>
> Key: KAFKA-10283
> URL: https://issues.apache.org/jira/browse/KAFKA-10283
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> In StreamsPartitionAssignor, we do a two-level assignment, one on the 
> client-level, and then after the assignment is done we further decide within 
> the client how to distributed among consumers if there are more.
> The {{ClientState}} class is used for book-keeping the assigned tasks, 
> however it is only used for the first level, while for the second level it is 
> done outside of the class and we only keep track of the results in a few maps 
> for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. 
> some on the client level and some on the consumer level.
> We would like to consolidate some of these maps into a single data structure 
> for better keeping track of the assignment information, and also for less bug 
> vulnerability causing the assignment information to be inconsistent. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-30 Thread GitBox


abbccdda commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-666590526


   @mimaison Could you rebase?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10328) MockConsumer behaves differently than KafkaConsumer

2020-07-30 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-10328:


 Summary: MockConsumer behaves differently than KafkaConsumer
 Key: KAFKA-10328
 URL: https://issues.apache.org/jira/browse/KAFKA-10328
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Jeff Kim
Assignee: Jeff Kim


the behavior of `MockConsumer` is different from `KafkaConsumer` under 
multi-threaded access.

MockConsumer should throw `ConcurrentModificationException` instead of 
synchronizing all methods.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch opened a new pull request #9106: KAFKA-10146, KAFKA-9066: Retain metrics for failed taskss (#8502)

2020-07-30 Thread GitBox


rhauch opened a new pull request #9106:
URL: https://github.com/apache/kafka/pull/9106


   Targets the `2.4` branch. See #8854 for the similar PR for the `2.5` branch, 
which is currently blocked by the 2.5.1 release effort.
   
   This backports the KAFKA-9066 / #8502 changes to retain metrics for failed 
tasks that was already merged to trunk and backported to the 2.6 branch.
   
   Like #8854, this PR has one change relative to the original PR: it removes 
an integration test added in the 2.6 branch for KIP-158 and modified as part of 
KAFKA-9066 / #8502.
   
   Author: Chris Egerton 
   Reviewers: Nigel Liang , Randall Hauch 

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)

2020-07-30 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168155#comment-17168155
 ] 

Sophie Blee-Goldman commented on KAFKA-10322:
-

Hey [~tbradlo]

Thanks for the bug report. I agree, this is obviously incorrect. What I'm not 
yet understanding is why we don't run into this problem all the time -- the 
WindowKeySchema methods are used pretty heavily by RocksDBWindowStore as well, 
and from a glance they all seem to assume that the serialized bytes include a 
sequence number.

I'll try to set up a test to figure out the true extent of this problem. Would 
you be interested in submitting a PR with the fix?

> InMemoryWindowStore restore keys format incompatibility (lack of 
> sequenceNumber in keys on topic)
> -
>
> Key: KAFKA-10322
> URL: https://issues.apache.org/jira/browse/KAFKA-10322
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
> Environment: windows/linux
>Reporter: Tomasz Bradło
>Priority: Major
>
> I have regular groupBy&Counting stream configuration:
> {code:java}
>
> fun addStream(kStreamBuilder: StreamsBuilder) {
> val storeSupplier = Stores.inMemoryWindowStore("count-store",
> Duration.ofDays(10),
> Duration.ofDays(1),
> false)
> val storeBuilder: StoreBuilder> = 
> Stores
> .windowStoreBuilder(storeSupplier, 
> JsonSerde(CountableEvent::class.java), Serdes.Long())
> kStreamBuilder
> .stream("input-topic", Consumed.with(Serdes.String(), 
> Serdes.String()))
> .map {_, jsonRepresentation -> 
> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofDays(1)))
> 
> .count(Materialized.with(JsonSerde(CountableEvent::class.java), 
> Serdes.Long()))
> .toStream()
> .to("topic1-count")
> val storeConsumed = 
> Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java),
>  Duration.ofDays(1).toMillis()), Serdes.Long())
> kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", 
> storeConsumed, passThroughProcessorSupplier)
> }{code}
> While sending to "topic1-count", for serializing the key 
> [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java]
>  is used which is using 
> [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112]
>  so the message key format is:
> {code:java}
> real_grouping_key + timestamp(8bytes){code}
>  
> Everything works. I can get correct values from state-store. But, in recovery 
> scenario, when [GlobalStateManagerImpl 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters
>  offset < highWatermark loop then
> [InMemoryWindowStore stateRestoreCallback 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads
>  from "topic1-count" and fails to extract valid key and timestamp using 
> [WindowKeySchema.extractStoreKeyBytes 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and
>  [WindowKeySchema.extractStoreTimestamp. 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It
>  fails because it expects format:
> {code:java}
> real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code}
> How this is supposed to work in this case?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches

2020-07-30 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168157#comment-17168157
 ] 

Randall Hauch commented on KAFKA-10146:
---

Since the 2.5.1 release is still ongoing, the PR for the 2.5 branch 
(https://github.com/apache/kafka/pull/8854) is blocked.

Since the 2.4 branch is unblocked and we want to get this feature into the next 
2.4.x release, I created a different PR targeting the 2.4 branch 
(https://github.com/apache/kafka/pull/9106).

> Backport KAFKA-9066 to 2.5 and 2.4 branches
> ---
>
> Key: KAFKA-10146
> URL: https://issues.apache.org/jira/browse/KAFKA-10146
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>  Labels: backport
> Fix For: 2.4.2, 2.5.2
>
>
> KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so 
> this was not backported at the time. However, once 2.5.1 is out the door, the 
> `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` 
> branches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on pull request #9106: KAFKA-10146, KAFKA-9066: Retain metrics for failed tasks (backport to 2.4)

2020-07-30 Thread GitBox


rhauch commented on pull request #9106:
URL: https://github.com/apache/kafka/pull/9106#issuecomment-11491


   @kkonstantine, here is a PR for the `2.4` branch that is a cherry-pick of 
the same one commit from #8854 that you've already approved.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10329) Enable connector context in logs by default

2020-07-30 Thread Randall Hauch (Jira)
Randall Hauch created KAFKA-10329:
-

 Summary: Enable connector context in logs by default
 Key: KAFKA-10329
 URL: https://issues.apache.org/jira/browse/KAFKA-10329
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 3.0.0
Reporter: Randall Hauch
 Fix For: 3.0.0


When 
[KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs]
 was implemented and released as part of AK 2.3, we chose to not enable these 
extra logging context information by default because it was not backward 
compatible, and anyone relying upon the `connect-log4j.properties` file 
provided by the AK distribution would after an upgrade to AK 2.3 (or later) see 
different formats for their logs, which could break any log processing 
functionality they were relying upon.

However, we should enable this in AK 3.0, whenever that comes. Doing so will 
require a fairly minor KIP to change the `connect-log4j.properties` file 
slightly.

Marked this as BLOCKER since it's a backward incompatible change that we 
definitely want to do in the 3.0.0 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)

2020-07-30 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168167#comment-17168167
 ] 

Tomasz Bradło commented on KAFKA-10322:
---

Hi [~ableegoldman] 

I don't know what is expected behavoiur here. Should recovery lambdas (both in 
InMemoryWindowStore and RocksDBWindowStore) assume that reading from topics 
there will be no sequence_number?

> InMemoryWindowStore restore keys format incompatibility (lack of 
> sequenceNumber in keys on topic)
> -
>
> Key: KAFKA-10322
> URL: https://issues.apache.org/jira/browse/KAFKA-10322
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
> Environment: windows/linux
>Reporter: Tomasz Bradło
>Priority: Major
>
> I have regular groupBy&Counting stream configuration:
> {code:java}
>
> fun addStream(kStreamBuilder: StreamsBuilder) {
> val storeSupplier = Stores.inMemoryWindowStore("count-store",
> Duration.ofDays(10),
> Duration.ofDays(1),
> false)
> val storeBuilder: StoreBuilder> = 
> Stores
> .windowStoreBuilder(storeSupplier, 
> JsonSerde(CountableEvent::class.java), Serdes.Long())
> kStreamBuilder
> .stream("input-topic", Consumed.with(Serdes.String(), 
> Serdes.String()))
> .map {_, jsonRepresentation -> 
> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofDays(1)))
> 
> .count(Materialized.with(JsonSerde(CountableEvent::class.java), 
> Serdes.Long()))
> .toStream()
> .to("topic1-count")
> val storeConsumed = 
> Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java),
>  Duration.ofDays(1).toMillis()), Serdes.Long())
> kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", 
> storeConsumed, passThroughProcessorSupplier)
> }{code}
> While sending to "topic1-count", for serializing the key 
> [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java]
>  is used which is using 
> [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112]
>  so the message key format is:
> {code:java}
> real_grouping_key + timestamp(8bytes){code}
>  
> Everything works. I can get correct values from state-store. But, in recovery 
> scenario, when [GlobalStateManagerImpl 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters
>  offset < highWatermark loop then
> [InMemoryWindowStore stateRestoreCallback 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads
>  from "topic1-count" and fails to extract valid key and timestamp using 
> [WindowKeySchema.extractStoreKeyBytes 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and
>  [WindowKeySchema.extractStoreTimestamp. 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It
>  fails because it expects format:
> {code:java}
> real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code}
> How this is supposed to work in this case?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method

2020-07-30 Thread GitBox


abbccdda commented on a change in pull request #9096:
URL: https://github.com/apache/kafka/pull/9096#discussion_r463260878



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,6 +183,7 @@ private boolean allSubscriptionsEqual(Set allTopics,
 Map> assignment = new HashMap<>(
 
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
 
+// step 1: Reassign as many previously owned partitions as possible

Review comment:
   nit: I don't think we need steps here, which makes it hard to squeeze in 
more comments later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-30 Thread GitBox


mimaison merged pull request #9029:
URL: https://github.com/apache/kafka/pull/9029


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-07-30 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168214#comment-17168214
 ] 

Mickael Maison commented on KAFKA-10017:


A couple of more failures:
- https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1780/consoleFull
- https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1771/consoleFull

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test, unit-test
> Fix For: 2.6.0
>
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-30 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-666701712


   @abbccdda Done!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)

2020-07-30 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168231#comment-17168231
 ] 

Sophie Blee-Goldman commented on KAFKA-10322:
-

Ok, I dug into this a big further and found that the changelogging layer will 
actually _always_ insert a sequence number, even when it's not necessary (ie 
retainDuplicates is false). On restoration this extra sequence number is 
dropped, so the correct bytes are ultimately inserted into the store. 

While this is obviously a bug in that we're storing an extra 4 bytes in the 
changelog, it seems like the key extraction should technically work on 
restoration. Did you hit an exception or other error after a restore, or did 
you just notice that something was wrong while looking at the source? 

> InMemoryWindowStore restore keys format incompatibility (lack of 
> sequenceNumber in keys on topic)
> -
>
> Key: KAFKA-10322
> URL: https://issues.apache.org/jira/browse/KAFKA-10322
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
> Environment: windows/linux
>Reporter: Tomasz Bradło
>Priority: Major
>
> I have regular groupBy&Counting stream configuration:
> {code:java}
>
> fun addStream(kStreamBuilder: StreamsBuilder) {
> val storeSupplier = Stores.inMemoryWindowStore("count-store",
> Duration.ofDays(10),
> Duration.ofDays(1),
> false)
> val storeBuilder: StoreBuilder> = 
> Stores
> .windowStoreBuilder(storeSupplier, 
> JsonSerde(CountableEvent::class.java), Serdes.Long())
> kStreamBuilder
> .stream("input-topic", Consumed.with(Serdes.String(), 
> Serdes.String()))
> .map {_, jsonRepresentation -> 
> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofDays(1)))
> 
> .count(Materialized.with(JsonSerde(CountableEvent::class.java), 
> Serdes.Long()))
> .toStream()
> .to("topic1-count")
> val storeConsumed = 
> Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java),
>  Duration.ofDays(1).toMillis()), Serdes.Long())
> kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", 
> storeConsumed, passThroughProcessorSupplier)
> }{code}
> While sending to "topic1-count", for serializing the key 
> [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java]
>  is used which is using 
> [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112]
>  so the message key format is:
> {code:java}
> real_grouping_key + timestamp(8bytes){code}
>  
> Everything works. I can get correct values from state-store. But, in recovery 
> scenario, when [GlobalStateManagerImpl 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters
>  offset < highWatermark loop then
> [InMemoryWindowStore stateRestoreCallback 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads
>  from "topic1-count" and fails to extract valid key and timestamp using 
> [WindowKeySchema.extractStoreKeyBytes 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and
>  [WindowKeySchema.extractStoreTimestamp. 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It
>  fails because it expects format:
> {code:java}
> real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code}
> How this is supposed to work in this case?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10137) Clean-up retain Duplicate logic in Window Stores

2020-07-30 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168234#comment-17168234
 ] 

Sophie Blee-Goldman commented on KAFKA-10137:
-

I think there's actually a real bug lurking here: I was looking at the 
ChangeLoggingWindowBytesStore and noticed we seem to insert the sequence number 
into the changelogged bytes regardless of `retainDuplicates`. 

We peel off the unnecessary seqnum during restoration, so it doesn't seem to 
cause any correctness issues. But we're obviously storing an extra 4 bytes per 
window store changelog record for no reason. Unfortunatel,y I'm not sure how 
this can be fixed in a backwards compatible way

> Clean-up retain Duplicate logic in Window Stores
> 
>
> Key: KAFKA-10137
> URL: https://issues.apache.org/jira/browse/KAFKA-10137
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Priority: Minor
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> The logic to maintain and append the sequence number is present in multiple 
> locations, namely in the changelogging window store and in its underlying 
> window stores. We should consolidate this code to one single location.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >