[jira] [Commented] (KAFKA-10266) Fix connector configs in docs to mention the correct default value inherited from worker configs
[ https://issues.apache.org/jira/browse/KAFKA-10266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167681#comment-17167681 ] Luke Chen commented on KAFKA-10266: --- [PR: https://github.com/apache/kafka/pull/9104|https://github.com/apache/kafka/pull/9104] > Fix connector configs in docs to mention the correct default value inherited > from worker configs > > > Key: KAFKA-10266 > URL: https://issues.apache.org/jira/browse/KAFKA-10266 > Project: Kafka > Issue Type: Bug >Reporter: Konstantine Karantasis >Assignee: Luke Chen >Priority: Major > > > Example: > [https://kafka.apache.org/documentation/#header.converter] > has the correct default when it is mentioned as a worker property. > But under the section of source connector configs, it's default value is said > to be `null`. > Though that is correct in terms of implementation, it's confusing for users. > We should surface the correct defaults for configs that inherit (or otherwise > override) worker configs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
ijuma commented on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-665828744 What were the throughput numbers? I assume you meant the connsumer perf test, not console consumer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda merged pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
abbccdda merged pull request #9081: URL: https://github.com/apache/kafka/pull/9081 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-665845747 @ijuma you're right, i meant the consumer perf test. I updated my comment to clarify This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r462453975 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,71 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); Review comment: Note in the post-KIP-500 world, this feature could still work, but the request must be redirected to the controller inherently on the broker side, instead of sending it directly. So in the comment, we may try to phrase it to convey the principal is that `the request must be handled by the controller` instead of `the admin client must send this request to the controller`. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); + +/** + * Applies specified updates to finalized features. This operation is not transactional so it + * may succeed for some features while fail for others. + * + * The API takes in a map of finalized feature name to {@link FeatureUpdate} that need to be Review comment: nit: s/name/names ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -983,8 +1144,25 @@ class KafkaController(val config: KafkaConfig, */ private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]): Unit = { try { + val filteredBrokers = scala.collection.mutable.Set[Int]() ++ brokers + if (config.isFeatureVersioningEnabled) { +def hasIncompatibleFeatures(broker: Broker): Boolean = { + val latestFinalizedFeatures = featureCache.get + if (latestFinalizedFeatures.isDefined) { +BrokerFeatures.hasIncompatibleFeatures(broker.features, latestFinalizedFeatures.get.features) + } else { +false + } +} +controllerContext.liveOrShuttingDownBrokers.foreach(broker => { + if (filteredBrokers.contains(broker.id) && hasIncompatibleFeatures(broker)) { Review comment: I see, what would happen to a currently live broker if it couldn't get any metadata update for a while, will it shut down itself? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either exp
[GitHub] [kafka] abbccdda closed pull request #8940: KAFKA-10181: AlterConfig/IncrementalAlterConfig should route to the controller for non validation calls
abbccdda closed pull request #8940: URL: https://github.com/apache/kafka/pull/8940 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
hachikuji commented on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-665938546 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #9104: KAFKA-10266: Update the connector config header.converter
showuon opened a new pull request #9104: URL: https://github.com/apache/kafka/pull/9104 After my update, the original wrong statement will be removed. > By default, the SimpleHeaderConverter is used to . And it'll be replaced with the following, with hyperlink to the worker config's header.converter section. > By default, the value is null and the Connect config will be used. Also, update the default value to **Inherited from Connect config**  ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
jsancio commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r462489390 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2090,6 +2093,12 @@ case class ReplicaLeaderElection( callback: ElectLeadersCallback = _ => {} ) extends ControllerEvent { override def state: ControllerState = ControllerState.ManualLeaderBalance + + override def preempt(): Unit = callback( +partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions => Review comment: Yeah. The value returned by the `fold` is passed to `callback`. `foreach` would return `Unit`. ## File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala ## @@ -140,6 +143,9 @@ class ControllerEventManager(controllerId: Int, } } + // for testing + private[controller] def setControllerEventThread(thread: ControllerEventThread): Unit = this.thread = thread Review comment: We can remove this since we have `private[controller] var thread` with the same visibility. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)
apovzner commented on a change in pull request #9072: URL: https://github.com/apache/kafka/pull/9072#discussion_r462645570 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MetricConfig; + +/** + * The {@link TokenBucket} is a {@link SampledStat} implementing a token bucket that can be used + * in conjunction with a {@link Rate} to enforce a quota. + * + * A token bucket accumulates tokens with a constant rate R, one token per 1/R second, up to a + * maximum burst size B. The burst size essentially means that we keep permission to do a unit of + * work for up to B / R seconds. + * + * The {@link TokenBucket} adapts this to fit within a {@link SampledStat}. It accumulates tokens + * in chunks of Q units by default (sample length * quota) instead of one token every 1/R second + * and expires in chunks of Q units too (when the oldest sample is expired). + * + * Internally, we achieve this behavior by not completing the current sample until we fill that + * sample up to Q (used all credits). Samples are filled up one after the others until the maximum + * number of samples is reached. If it is not possible to created a new sample, we accumulate in + * the last one until a new one can be created. The over used credits are spilled over to the new + * sample at when it is created. Every time a sample is purged, Q credits are made available. + * + * It is important to note that the maximum burst is not enforced in the class and depends on + * how the quota is enforced in the {@link Rate}. + */ +public class TokenBucket extends SampledStat { + +private final TimeUnit unit; + +/** + * Instantiates a new TokenBucket that works by default with a Quota {@link MetricConfig#quota()} + * in {@link TimeUnit#SECONDS}. + */ +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +/** + * Instantiates a new TokenBucket that works with the provided time unit. + * + * @param unit The time unit of the Quota {@link MetricConfig#quota()} + */ +public TokenBucket(TimeUnit unit) { +super(0); +this.unit = unit; +} + +@Override +public void record(MetricConfig config, double value, long timeMs) { Review comment: @junrao Regarding "With this change, if we record a large value, the observed effect of the value could last much longer than the number of samples. " -- This will not happen with this approach. If we record a very large value, we never move to the bucket with timestamp > current timestamp (of the recording time). This approach can only add the value to older buckets, which did not expire, but never to the buckets "in the future". For example, if we only have 2 samples, and perSampleQuota = 5, and say we already filled in both buckets up to quota: [5, 5]. If new requests arrive but the timestamp did not move past the last bucket, we are going to be adding this value to the last bucket, for example getting to [5, 20] if we recorded 15. If the next recording happens after the time moved passed the last bucket, say we record 3, then buckets will look like [20, 3]. ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOU
[GitHub] [kafka] guozhangwang commented on pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup
guozhangwang commented on pull request #9095: URL: https://github.com/apache/kafka/pull/9095#issuecomment-665967044 LGTM. Please feel free to merge and cherry-pick. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9104: KAFKA-10266: Update the connector config header.converter
showuon commented on pull request #9104: URL: https://github.com/apache/kafka/pull/9104#issuecomment-666170759 @kkonstantine , could you help review this PR to correct the documentation. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda merged pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup
abbccdda merged pull request #9095: URL: https://github.com/apache/kafka/pull/9095 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah edited a comment on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah edited a comment on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-665795947 I ran the consumer perf test (at @hachikuji's suggestion) and took a profile. Throughput was around 500MB/s on trunk and on this branch  Zoomed in a bit on the records part:  This was with only a handful of partitions on a single broker (on my laptop), but it confirms that the new FetchResponse serialization is hitting the same sendfile path as the previous code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda merged pull request #9012: KAFKA-10270: A broker to controller channel manager
abbccdda merged pull request #9012: URL: https://github.com/apache/kafka/pull/9012 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8854: KAFKA-10146, KAFKA-9066: Retain metrics for failed tasks (#8502)
kkonstantine commented on pull request #8854: URL: https://github.com/apache/kafka/pull/8854#issuecomment-665952918 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup
abbccdda commented on pull request #9095: URL: https://github.com/apache/kafka/pull/9095#issuecomment-665999727 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9103: Add redirection for (Incremental)AlterConfig
abbccdda opened a new pull request #9103: URL: https://github.com/apache/kafka/pull/9103 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8940: KAFKA-10181: AlterConfig/IncrementalAlterConfig should route to the controller for non validation calls
abbccdda commented on pull request #8940: URL: https://github.com/apache/kafka/pull/8940#issuecomment-666091752 Will close this PR as the KIP-590 requirement changes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah opened a new pull request #9100: URL: https://github.com/apache/kafka/pull/9100 WIP This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)
junrao commented on a change in pull request #9072: URL: https://github.com/apache/kafka/pull/9072#discussion_r462624834 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MetricConfig; + +/** + * The {@link TokenBucket} is a {@link SampledStat} implementing a token bucket that can be used + * in conjunction with a {@link Rate} to enforce a quota. + * + * A token bucket accumulates tokens with a constant rate R, one token per 1/R second, up to a + * maximum burst size B. The burst size essentially means that we keep permission to do a unit of + * work for up to B / R seconds. + * + * The {@link TokenBucket} adapts this to fit within a {@link SampledStat}. It accumulates tokens + * in chunks of Q units by default (sample length * quota) instead of one token every 1/R second + * and expires in chunks of Q units too (when the oldest sample is expired). + * + * Internally, we achieve this behavior by not completing the current sample until we fill that + * sample up to Q (used all credits). Samples are filled up one after the others until the maximum + * number of samples is reached. If it is not possible to created a new sample, we accumulate in + * the last one until a new one can be created. The over used credits are spilled over to the new + * sample at when it is created. Every time a sample is purged, Q credits are made available. + * + * It is important to note that the maximum burst is not enforced in the class and depends on + * how the quota is enforced in the {@link Rate}. + */ +public class TokenBucket extends SampledStat { + +private final TimeUnit unit; + +/** + * Instantiates a new TokenBucket that works by default with a Quota {@link MetricConfig#quota()} + * in {@link TimeUnit#SECONDS}. + */ +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +/** + * Instantiates a new TokenBucket that works with the provided time unit. + * + * @param unit The time unit of the Quota {@link MetricConfig#quota()} + */ +public TokenBucket(TimeUnit unit) { +super(0); +this.unit = unit; +} + +@Override +public void record(MetricConfig config, double value, long timeMs) { Review comment: Just a high level comment on the approach. This approach tries to spread a recorded value to multiple samples if the sample level quota is exceeded. While this matches the token bucket behavior for quota, it changes the behavior when we observe the value of the measurable. Currently, we record a full value in the current Sample. When we observe the value of the measurable, the effect of this value will last for the number of samples. After which, this value rolls out and no longer impacts the observed measurable. With this change, if we record a large value, the observed effect of the value could last much longer than the number of samples. For example, if we have 10 1-sec samples each with a quota of 1 and we record a value of 1000, the effect of this value will last for 1000 secs instead of 10 secs from the observability perspective. This may cause some confusion since we don't quite know when an event actually occurred. I was thinking about an alternative approach that decouples the recording/observation of the measurable from quota calculation. Here is the rough idea. We create a customized SampledStat that records new values in a single sample as it is. In addition, it maintains an accumulated available credit. As time advances, we add new credits based on the quota rate, capped by samples * perSampleQuota. When a value is recorded, we deduct the value from the credit and allow the credit to go below 0. We change the Sensor.checkQuotas() logic such that if the customized SampledStat is used, we throw QuotaViolationException if credit is < 0. This preserves the current behavior for observability, but
[GitHub] [kafka] dielhennr opened a new pull request #9101: KAFKA-10325: KIP-649 implementation
dielhennr opened a new pull request #9101: URL: https://github.com/apache/kafka/pull/9101 This is the initial implementation of [KIP-649](https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=158869615&draftShareId=c349fbe8-7aa8-4fde-a8e4-8d719cda3b9a&;) which implements dynamic client configuration and this is still a work in progress. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup
guozhangwang commented on a change in pull request #9095: URL: https://github.com/apache/kafka/pull/9095#discussion_r462589015 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -173,6 +178,18 @@ public boolean stillRunning() { } } +public boolean inErrorState() { +synchronized (stateLock) { +return state.inErrorState(); +} +} + +public boolean stillInitializing() { +synchronized (stateLock) { +return !state.isRunning() && !state.inErrorState(); Review comment: Why not just `state.CREATED` as we are excluding three out of four states here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose
[ https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167720#comment-17167720 ] Bruno Cadonna commented on KAFKA-7540: -- https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7618 {code:java} 20:08:12 kafka.api.ConsumerBounceTest > testClose FAILED 20:08:12 java.lang.AssertionError: Assignment did not complete on time 20:08:12 at org.junit.Assert.fail(Assert.java:89) 20:08:12 at org.junit.Assert.assertTrue(Assert.java:42) 20:08:12 at kafka.api.ConsumerBounceTest.checkClosedState(ConsumerBounceTest.scala:486) 20:08:12 at kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:257) 20:08:12 at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:220) {code} > Flaky Test ConsumerBounceTest#testClose > --- > > Key: KAFKA-7540 > URL: https://issues.apache.org/jira/browse/KAFKA-7540 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 2.2.0 >Reporter: John Roesler >Assignee: Jason Gustafson >Priority: Critical > Labels: flaky-test > Fix For: 2.7.0, 2.6.1 > > > Observed on Java 8: > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/] > > Stacktrace: > {noformat} > java.lang.ArrayIndexOutOfBoundsException: -1 > at > kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146) > at > kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238) > at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAda
[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167726#comment-17167726 ] Bruno Cadonna commented on KAFKA-9013: -- https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3608 {code:java} java.lang.RuntimeException: Could not find enough records. found 0, expected 100 at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435) at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:221) {code} > Flaky Test MirrorConnectorsIntegrationTest#testReplication > -- > > Key: KAFKA-9013 > URL: https://issues.apache.org/jira/browse/KAFKA-9013 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > > h1. Stacktrace: > {code:java} > java.lang.AssertionError: Condition not met within timeout 2. Offsets not > translated downstream to primary cluster. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239) > {code} > h1. Standard Error > {code} > Standard Error > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.RootResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. > Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors > WARNING: The following warnings have been detected: WARNING: The > (sub)resource method listLoggers in > org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectors in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method createConnector in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectorPlugins in > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > contains empty path annotation. > WARNING: The (sub)resource method serverInfo in > org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty > path annotation. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implemen
[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
[ https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167727#comment-17167727 ] Bruno Cadonna commented on KAFKA-10255: --- https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3608 {code:java} java.lang.AssertionError: consumer record size is not zero expected:<0> but was:<4> {code} > Fix flaky testOneWayReplicationWithAutorOffsetSync1 test > > > Key: KAFKA-10255 > URL: https://issues.apache.org/jira/browse/KAFKA-10255 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0 > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 STARTED > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 FAILED > java.lang.AssertionError: consumer record size is not zero expected:<0> but > was:<2> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness
cadonna commented on pull request #9087: URL: https://github.com/apache/kafka/pull/9087#issuecomment-666202514 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness
chia7712 commented on pull request #9087: URL: https://github.com/apache/kafka/pull/9087#issuecomment-666205937 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)
dajac commented on a change in pull request #9072: URL: https://github.com/apache/kafka/pull/9072#discussion_r462846252 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MetricConfig; + +/** + * The {@link TokenBucket} is a {@link SampledStat} implementing a token bucket that can be used + * in conjunction with a {@link Rate} to enforce a quota. + * + * A token bucket accumulates tokens with a constant rate R, one token per 1/R second, up to a + * maximum burst size B. The burst size essentially means that we keep permission to do a unit of + * work for up to B / R seconds. + * + * The {@link TokenBucket} adapts this to fit within a {@link SampledStat}. It accumulates tokens + * in chunks of Q units by default (sample length * quota) instead of one token every 1/R second + * and expires in chunks of Q units too (when the oldest sample is expired). + * + * Internally, we achieve this behavior by not completing the current sample until we fill that + * sample up to Q (used all credits). Samples are filled up one after the others until the maximum + * number of samples is reached. If it is not possible to created a new sample, we accumulate in + * the last one until a new one can be created. The over used credits are spilled over to the new + * sample at when it is created. Every time a sample is purged, Q credits are made available. + * + * It is important to note that the maximum burst is not enforced in the class and depends on + * how the quota is enforced in the {@link Rate}. + */ +public class TokenBucket extends SampledStat { + +private final TimeUnit unit; + +/** + * Instantiates a new TokenBucket that works by default with a Quota {@link MetricConfig#quota()} + * in {@link TimeUnit#SECONDS}. + */ +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +/** + * Instantiates a new TokenBucket that works with the provided time unit. + * + * @param unit The time unit of the Quota {@link MetricConfig#quota()} + */ +public TokenBucket(TimeUnit unit) { +super(0); +this.unit = unit; +} + +@Override +public void record(MetricConfig config, double value, long timeMs) { Review comment: @junrao Thanks for your comment. Your observation is correct. This is because we spill over the amount above the quota into any newly created sample. So @apovzner's example would actually end up like this: [5, 18]. @apovzner I think that you have mentioned once another way to do this that may not require to spill over the remainder. Your remark regarding the observability is right. This is true as soon as we deviate from a sampled rate, regardless of the implementation that we may choose eventually. Separating concerns is indeed a good idea if we can. Regarding your suggested approach, I think that may works. As @apovzner said, we will need to change the way the throttle time is computed so that it does not rely on the rate but on the amount of credits. Another concern is that the credits won't be observable neither so we may keep the correct rate but still may not understand why one is throttled or not if we can't observe the amount of credits used/left. If we want to separate concerns, having both a sampled Rate and a Token Bucket working side by side would be the best. The Rate would continue to provide the actual Rate as of today and the Token Bucket would be used to enforced the quota. We could expose the amount of credits in the bucket via a new metric. One way to achieve this would be to have two MeasurableStats within the Sensor: the Rate and the Token Bucket. We would still need to have an adapted version of the Token Bucket that works with our current configs (quota, samples, window). I implemented one to compare with the current approach few weeks ago: https://github.com/dajac/kafka/blob/1
[GitHub] [kafka] mimaison commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()
mimaison commented on pull request #9007: URL: https://github.com/apache/kafka/pull/9007#issuecomment-666239450 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()
mimaison commented on pull request #9007: URL: https://github.com/apache/kafka/pull/9007#issuecomment-666256788 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
cadonna commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r462881933 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ## @@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String group0100To24, checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, expectedNumberofE2ELatencyMetrics); Review comment: Sorry, I did a mistake here. We should not give new metrics old groups. I think to fix this test you need to adapt the filter on line 618 to let all metrics with groups that relate to KV state stores pass. See `checkWindowStoreAndSuppressionBufferMetrics()` for an example. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
cadonna commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r462882926 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java ## @@ -443,6 +447,25 @@ public static Sensor suppressionBufferSizeSensor(final String threadId, ); } +public static Sensor e2ELatencySensor(final String threadId, + final String taskId, + final String storeType, + final String storeName, + final StreamsMetricsImpl streamsMetrics) { +final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE); +final Map tagMap = streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName); +addAvgAndMinAndMaxToSensor( +sensor, +STATE_STORE_LEVEL_GROUP, Review comment: I just realized that we should not put new metrics into old groups. Your code is fine. Do not use `stateStoreLevelGroup()`! Sorry for the confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
cadonna commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r462883347 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java ## @@ -327,6 +327,38 @@ public void shouldGetExpiredWindowRecordDropSensor() { assertThat(sensor, is(expectedSensor)); } +@Test +public void shouldGetRecordE2ELatencySensor() { +final String metricName = "record-e2e-latency"; + +final String e2eLatencyDescription = +"end-to-end latency of a record, measuring by comparing the record timestamp with the " ++ "system time when it has been fully processed by the node"; +final String descriptionOfAvg = "The average " + e2eLatencyDescription; +final String descriptionOfMin = "The minimum " + e2eLatencyDescription; +final String descriptionOfMax = "The maximum " + e2eLatencyDescription; + +expect(streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, STORE_NAME, metricName, RecordingLevel.TRACE)) +.andReturn(expectedSensor); +expect(streamsMetrics.storeLevelTagMap(THREAD_ID, TASK_ID, STORE_TYPE, STORE_NAME)).andReturn(storeTagMap); +StreamsMetricsImpl.addAvgAndMinAndMaxToSensor( +expectedSensor, +STORE_LEVEL_GROUP, Review comment: I just realized that we should not put new metrics into old groups. Your code is fine. Do not use instance variable `storeLevelGroup`! Sorry for the confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-666310163 @dajac Ok, I've removed the `Optional` to `Option` changes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness
cadonna commented on pull request #9087: URL: https://github.com/apache/kafka/pull/9087#issuecomment-666311943 Java 14 failed due to the following failing test: ``` org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > shouldUpgradeFromEosAlphaToEosBeta[true] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10327) Make flush after some count of putted records in SinkTask
Pavel Kuznetsov created KAFKA-10327: --- Summary: Make flush after some count of putted records in SinkTask Key: KAFKA-10327 URL: https://issues.apache.org/jira/browse/KAFKA-10327 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.5.0 Reporter: Pavel Kuznetsov In current version of kafka connect all records accumulated with SinkTask.put method are flushed to target system on a time-based manner. So data is flushed and offsets are committed every offset.flush.timeout.ms (default is 6) ms. But you can't control the number of messages you receive from Kafka between two flushes. It may cause out of memory errors, because in-memory buffer may grow a lot. I suggest to add out of box support of count-based flush to kafka connect. It requires new configuration parameter (offset.flush.count, for example). Number of records sent to SinkTask.put should be counted, and if these amount is greater than offset.flush.count's value, SinkTask.flush is called and offsets are committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness
chia7712 commented on pull request #9087: URL: https://github.com/apache/kafka/pull/9087#issuecomment-666325276 ```EosBetaUpgradeIntegrationTest``` pass on my local. retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…
chia7712 commented on pull request #9102: URL: https://github.com/apache/kafka/pull/9102#issuecomment-666325552 ```EosBetaUpgradeIntegrationTest``` is flaky... retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()
mimaison commented on pull request #9007: URL: https://github.com/apache/kafka/pull/9007#issuecomment-666353178 Failures look unrelated: - org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > shouldUpgradeFromEosAlphaToEosBeta[true] FAILED - org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testOneWayReplicationWithAutorOffsetSync1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()
mimaison merged pull request #9007: URL: https://github.com/apache/kafka/pull/9007 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10120) DescribeLogDirsResult exposes internal classes
[ https://issues.apache.org/jira/browse/KAFKA-10120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-10120. Fix Version/s: 2.7 Resolution: Fixed > DescribeLogDirsResult exposes internal classes > -- > > Key: KAFKA-10120 > URL: https://issues.apache.org/jira/browse/KAFKA-10120 > Project: Kafka > Issue Type: Bug >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > Fix For: 2.7 > > > DescribeLogDirsResult (returned by AdminClient#describeLogDirs(Collection)) > exposes a number of internal types: > * {{DescribeLogDirsResponse.LogDirInfo}} > * {{DescribeLogDirsResponse.ReplicaInfo}} > * {{Errors}} > {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
mimaison commented on a change in pull request #9029: URL: https://github.com/apache/kafka/pull/9029#discussion_r462989546 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer consumer, L }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } +private void waitForConsumingAllRecords(Consumer consumer) throws InterruptedException { +final AtomicInteger totalConsumedRecords = new AtomicInteger(0); +waitForCondition(() -> { +ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); +consumer.commitSync(); +return NUM_RECORDS_PRODUCED == totalConsumedRecords.addAndGet(records.count()); +}, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the records in time"); +} + @Test public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException { // create consumers before starting the connectors so we don't need to wait for discovery -Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"group.id", "consumer-group-1"), "test-topic-1"); -consumer1.poll(Duration.ofMillis(500)); -consumer1.commitSync(); -consumer1.close(); +try (Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( +"group.id", "consumer-group-1"), "test-topic-1")) { +// we need to wait for consuming all the records for MM2 replicaing the expected offsets Review comment: `replicaing` -> `replicating` ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer consumer, L }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } +private void waitForConsumingAllRecords(Consumer consumer) throws InterruptedException { +final AtomicInteger totalConsumedRecords = new AtomicInteger(0); +waitForCondition(() -> { +ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); Review comment: Can we add the types `` to `ConsumerRecords`? ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -387,11 +393,11 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio } // create a consumer at primary cluster to consume the new topic -consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"group.id", "consumer-group-1"), "test-topic-2"); -consumer1.poll(Duration.ofMillis(500)); -consumer1.commitSync(); -consumer1.close(); +try (Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( +"group.id", "consumer-group-1"), "test-topic-2")) { +// we need to wait for consuming all the records for MM2 replicaing the expected offsets Review comment: `replicaing` -> `replicating` ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer consumer, L }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } +private void waitForConsumingAllRecords(Consumer consumer) throws InterruptedException { +final AtomicInteger totalConsumedRecords = new AtomicInteger(0); +waitForCondition(() -> { +ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); +consumer.commitSync(); Review comment: We can move that line after the `waitForCondition()` block to just commit once all records have been consumed. ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer consumer, L }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } +private void waitForConsumingAllRecords(Consumer consumer) throws InterruptedException { +final AtomicInteger totalConsumedRecords = new AtomicInteger(0); +waitForCondition(() -> { +ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); +consumer.commitSync(); +return NUM_RECORDS_PRODUCED == totalConsumedRecords.addAndGet(records.count()); +}, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the records in time"); Review comment: nit: The
[jira] [Assigned] (KAFKA-10048) Possible data gap for a consumer after a failover when using MM2
[ https://issues.apache.org/jira/browse/KAFKA-10048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-10048: -- Assignee: Andre Araujo > Possible data gap for a consumer after a failover when using MM2 > > > Key: KAFKA-10048 > URL: https://issues.apache.org/jira/browse/KAFKA-10048 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Andre Araujo >Assignee: Andre Araujo >Priority: Major > > I've been looking at some MM2 scenarios and identified a situation where > consumers can miss consuming some data in the even of a failover. > > When a consumer subscribes to a topic for the first time and commits offsets, > the offsets for every existing partition of that topic will be saved to the > cluster's {{__consumer_offset}} topic. Even if a partition is completely > empty, the offset {{0}} will still be saved for the consumer's consumer group. > > When MM2 is replicating the checkpoints to the remote cluster, though, it > [ignores anything that has an offset equals to > zero|https://github.com/apache/kafka/blob/856e36651203b03bf9a6df2f2d85a356644cbce3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L135], > replicating offsets only for partitions that contain data. > > This can lead to a gap in the data consumed by consumers in the following > scenario: > # Topic is created on the source cluster. > # MM2 is configured to replicate the topic and consumer groups > # Producer starts to produce data to the source topic but for some reason > some partitions do not get data initially, while others do (skewed keyed > messages or bad luck) > # Consumers start to consume data from that topic and their consumer groups' > offsets are replicated to the target cluster, *but only for partitions that > contain data*. The consumers are using the default setting auto.offset.reset > = latest. > # A consumer failover to the second cluster is performed (for whatever > reason), and the offset translation steps are completed. The consumer are not > restarted yet. > # The producers continue to produce data to the source cluster topic and now > produce data to the partitions that were empty before. > # *After* the producers start producing data, consumers are started on the > target cluster and start consuming. > For the partitions that already had data before the failover, everything > works fine. The consumer offsets will have been translated correctly and the > consumers will start consuming from the correct position. > For the partitions that were empty before the failover, though, any data > written by the producers to those partitions *after the failover but before > the consumers start* will be completely missed, since the consumers will jump > straight to the latest offset when they start due to the lack of a zero > offset stored locally on the target cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10048) Possible data gap for a consumer after a failover when using MM2
[ https://issues.apache.org/jira/browse/KAFKA-10048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-10048: -- Assignee: (was: Mickael Maison) > Possible data gap for a consumer after a failover when using MM2 > > > Key: KAFKA-10048 > URL: https://issues.apache.org/jira/browse/KAFKA-10048 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Andre Araujo >Priority: Major > > I've been looking at some MM2 scenarios and identified a situation where > consumers can miss consuming some data in the even of a failover. > > When a consumer subscribes to a topic for the first time and commits offsets, > the offsets for every existing partition of that topic will be saved to the > cluster's {{__consumer_offset}} topic. Even if a partition is completely > empty, the offset {{0}} will still be saved for the consumer's consumer group. > > When MM2 is replicating the checkpoints to the remote cluster, though, it > [ignores anything that has an offset equals to > zero|https://github.com/apache/kafka/blob/856e36651203b03bf9a6df2f2d85a356644cbce3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L135], > replicating offsets only for partitions that contain data. > > This can lead to a gap in the data consumed by consumers in the following > scenario: > # Topic is created on the source cluster. > # MM2 is configured to replicate the topic and consumer groups > # Producer starts to produce data to the source topic but for some reason > some partitions do not get data initially, while others do (skewed keyed > messages or bad luck) > # Consumers start to consume data from that topic and their consumer groups' > offsets are replicated to the target cluster, *but only for partitions that > contain data*. The consumers are using the default setting auto.offset.reset > = latest. > # A consumer failover to the second cluster is performed (for whatever > reason), and the offset translation steps are completed. The consumer are not > restarted yet. > # The producers continue to produce data to the source cluster topic and now > produce data to the partitions that were empty before. > # *After* the producers start producing data, consumers are started on the > target cluster and start consuming. > For the partitions that already had data before the failover, everything > works fine. The consumer offsets will have been translated correctly and the > consumers will start consuming from the correct position. > For the partitions that were empty before the failover, though, any data > written by the producers to those partitions *after the failover but before > the consumers start* will be completely missed, since the consumers will jump > straight to the latest offset when they start due to the lack of a zero > offset stored locally on the target cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10048) Possible data gap for a consumer after a failover when using MM2
[ https://issues.apache.org/jira/browse/KAFKA-10048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-10048: -- Assignee: Mickael Maison > Possible data gap for a consumer after a failover when using MM2 > > > Key: KAFKA-10048 > URL: https://issues.apache.org/jira/browse/KAFKA-10048 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Andre Araujo >Assignee: Mickael Maison >Priority: Major > > I've been looking at some MM2 scenarios and identified a situation where > consumers can miss consuming some data in the even of a failover. > > When a consumer subscribes to a topic for the first time and commits offsets, > the offsets for every existing partition of that topic will be saved to the > cluster's {{__consumer_offset}} topic. Even if a partition is completely > empty, the offset {{0}} will still be saved for the consumer's consumer group. > > When MM2 is replicating the checkpoints to the remote cluster, though, it > [ignores anything that has an offset equals to > zero|https://github.com/apache/kafka/blob/856e36651203b03bf9a6df2f2d85a356644cbce3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L135], > replicating offsets only for partitions that contain data. > > This can lead to a gap in the data consumed by consumers in the following > scenario: > # Topic is created on the source cluster. > # MM2 is configured to replicate the topic and consumer groups > # Producer starts to produce data to the source topic but for some reason > some partitions do not get data initially, while others do (skewed keyed > messages or bad luck) > # Consumers start to consume data from that topic and their consumer groups' > offsets are replicated to the target cluster, *but only for partitions that > contain data*. The consumers are using the default setting auto.offset.reset > = latest. > # A consumer failover to the second cluster is performed (for whatever > reason), and the offset translation steps are completed. The consumer are not > restarted yet. > # The producers continue to produce data to the source cluster topic and now > produce data to the partitions that were empty before. > # *After* the producers start producing data, consumers are started on the > target cluster and start consuming. > For the partitions that already had data before the failover, everything > works fine. The consumer offsets will have been translated correctly and the > consumers will start consuming from the correct position. > For the partitions that were empty before the failover, though, any data > written by the producers to those partitions *after the failover but before > the consumers start* will be completely missed, since the consumers will jump > straight to the latest offset when they start due to the lack of a zero > offset stored locally on the target cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-666374043 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463026843 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ## @@ -132,16 +135,19 @@ final boolean stateCreated, final StoreBuilder storeBuilder, final Windows windows, + final SlidingWindows slidingWindows, final SessionWindows sessionWindows, final Merger sessionMerger) { final ProcessorSupplier kStreamAggregate; -if (windows == null && sessionWindows == null) { +if (windows == null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator); -} else if (windows != null && sessionWindows == null) { +} else if (windows != null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator); -} else if (windows == null && sessionMerger != null) { +} else if (windows == null && slidingWindows != null && sessionWindows == null) { +kStreamAggregate = new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, aggregator); +} else if (windows == null && slidingWindows == null && sessionMerger != null) { Review comment: The original just had the check for `sessionMerger != null`, are there scenarios where the sessionMerger would be null but the sessionWindows wouldn't? I did think it was kind of inconsistent to check 'sessionMerger' just that one time and check 'sessionWindows' the other times so maybe it was a mistake This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon commented on a change in pull request #9029: URL: https://github.com/apache/kafka/pull/9029#discussion_r463028636 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer consumer, L }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } +private void waitForConsumingAllRecords(Consumer consumer) throws InterruptedException { +final AtomicInteger totalConsumedRecords = new AtomicInteger(0); +waitForCondition(() -> { +ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); +consumer.commitSync(); Review comment: Good suggestion! Thanks. ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer consumer, L }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } +private void waitForConsumingAllRecords(Consumer consumer) throws InterruptedException { +final AtomicInteger totalConsumedRecords = new AtomicInteger(0); +waitForCondition(() -> { +ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); Review comment: Done. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-666395292 @mimaison , thanks for your good suggestion! I've updated in this commit: https://github.com/apache/kafka/pull/9029/commits/8bc4a543dda6ddd90d752f7e6a64c63d85a1de3f. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463036048 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { Review comment: done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-666398632 > 2 system test failures in the latest PR those 2 failed tests are flaky on my local and there are issue/PR related to them. @junrao @ijuma @hachikuji @rajinisivaram please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463039985 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; + +import java.time.Duration; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME; +import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME; + +public class SlidingWindowedKStreamImpl extends AbstractStream implements TimeWindowedKStream { +private final SlidingWindows windows; +private final GroupedStreamAggregateBuilder aggregateBuilder; + +SlidingWindowedKStreamImpl(final SlidingWindows windows, +final InternalStreamsBuilder builder, +final Set subTopologySourceNodes, +final String name, +final Serde keySerde, +final Serde valueSerde, +final GroupedStreamAggregateBuilder aggregateBuilder, +final StreamsGraphNode streamsGraphNode) { +super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder); +this.windows = Objects.requireNonNull(windows, "windows can't be null"); +this.aggregateBuilder = aggregateBuilder; +} + +@Override +public KTable, Long> count() { +return count(NamedInternal.empty()); +} + +@Override +public KTable, Long> count(final Named named) { +return doCount(named, Materialized.with(keySerde, Serdes.Long())); +} + + +@Override +public KTable, Long> count(final Materialized> materialized) { +return count(NamedInternal.empty(), materialized); +} + +@Override +public KTable, Long> count(final Named named, final Materialized> materialized) { +Objects.requireNonNull(materialized, "materialized can't be null"); + +// TODO: remove this when we do a topology-incompatible release +// we used to burn a topology name here, so we have to keep doing it for compatibility +if (new MaterializedInternal<>(materialized).storeName() == null) { +builder.newStoreName(AGGREGATE_NAME); +} + +return doCount(named, materialized); +} + +private KTable, Long> doCount(final Named named, + final Materialized> materialized) { +final MaterializedInternal> materializedInternal = +new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + +if (materializedInternal.keySerde() == null) { +materializedInternal.withKeySerde(keySerde); +} +if (materializedInternal.valueSerde() == null) { +materializedInternal.withValueSerde(Serdes.Long()); +} + +final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + +return aggregateBu
[GitHub] [kafka] dajac commented on pull request #9091: MINOR; Make KafkaAdminClientTest.testDescribeLogDirsPartialFailure and KafkaAdminClientTest.testAlterReplicaLogDirsPartialFailure test more reli
dajac commented on pull request #9091: URL: https://github.com/apache/kafka/pull/9091#issuecomment-666402566 rebased to fix conflicts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-666403581 @junrao Thanks for the summary. With regards to the thread pool option, would this be used for completion of delayed operations for the group coordinator only? When it comes to tuning and monitoring, you're thinking we'd have to introduce a config for the number of threads and an `idle` metric? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r463044371 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -239,9 +239,13 @@ class GroupMetadataManager(brokerId: Int, } } + /** + * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been + * incremented. The caller ought to complete delayed requests for those returned partitions. + */ def storeGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], - responseCallback: Errors => Unit): Unit = { + responseCallback: Errors => Unit): Map[TopicPartition, LeaderHwChange] = { Review comment: I raised the point before that it's a bit unusual and unintuitive to have both a callback and a return value. Any thoughts on this? ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -369,6 +370,32 @@ class GroupCoordinator(val brokerId: Int, } } + /** + * try to complete produce, fetch and delete requests if the HW of partition is incremented. Otherwise, we try to complete + * only delayed fetch requests. + * + * Noted that this method may hold a lot of group lock so the caller should NOT hold any group lock + * in order to avoid deadlock + * @param topicPartitions a map contains the partition and a flag indicting whether the HWM has been changed + */ + private[group] def completeDelayedRequests(topicPartitions: Map[TopicPartition, LeaderHwChange]): Unit = +topicPartitions.foreach { + case (tp, leaderHWIncremented) => leaderHWIncremented match { +case LeaderHwIncremented$ => groupManager.replicaManager.completeDelayedRequests(tp) Review comment: I notice that we are including the `$` here and in a few other places, we should not do that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463046306 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463047237 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463056420 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] dajac commented on a change in pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long
dajac commented on a change in pull request #9092: URL: https://github.com/apache/kafka/pull/9092#discussion_r463056307 ## File path: core/src/main/scala/kafka/server/DynamicConfig.scala ## @@ -103,7 +103,7 @@ object DynamicConfig { .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc) .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc) .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc) - .define(ControllerMutationOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ControllerMutationOverrideDoc) + .define(ControllerMutationOverrideProp, DOUBLE, DefaultConsumerOverride, MEDIUM, ControllerMutationOverrideDoc) Review comment: good point. i have changed this to use `Int.MaxValue.toDouble` by default like we do for the request quota. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9097: KAFKA-10319: Skip unknown offsets when computing sum of changelog offsets
vvcephei merged pull request #9097: URL: https://github.com/apache/kafka/pull/9097 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long
rajinisivaram commented on pull request #9092: URL: https://github.com/apache/kafka/pull/9092#issuecomment-666428695 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long
rajinisivaram commented on pull request #9092: URL: https://github.com/apache/kafka/pull/9092#issuecomment-666429262 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463063406 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r463066191 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -239,9 +239,13 @@ class GroupMetadataManager(brokerId: Int, } } + /** + * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been + * incremented. The caller ought to complete delayed requests for those returned partitions. + */ def storeGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], - responseCallback: Errors => Unit): Unit = { + responseCallback: Errors => Unit): Map[TopicPartition, LeaderHwChange] = { Review comment: The response was https://github.com/apache/kafka/pull/8657#discussion_r452754165 In short, we should have a way of fetching delayed request from partition instead of using return value to carry them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r463066323 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -369,6 +370,32 @@ class GroupCoordinator(val brokerId: Int, } } + /** + * try to complete produce, fetch and delete requests if the HW of partition is incremented. Otherwise, we try to complete + * only delayed fetch requests. + * + * Noted that this method may hold a lot of group lock so the caller should NOT hold any group lock + * in order to avoid deadlock + * @param topicPartitions a map contains the partition and a flag indicting whether the HWM has been changed + */ + private[group] def completeDelayedRequests(topicPartitions: Map[TopicPartition, LeaderHwChange]): Unit = +topicPartitions.foreach { + case (tp, leaderHWIncremented) => leaderHWIncremented match { +case LeaderHwIncremented$ => groupManager.replicaManager.completeDelayedRequests(tp) Review comment: will copy that! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463073842 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463076565 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463077196 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463078058 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463083346 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463083821 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463088582 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463088582 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463091437 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463092579 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[jira] [Commented] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState
[ https://issues.apache.org/jira/browse/KAFKA-10283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168029#comment-17168029 ] highluck commented on KAFKA-10283: -- [~guozhang] Can I pick this issue? Is there any structure you think of? > Consolidate client-level and consumer-level assignment within ClientState > - > > Key: KAFKA-10283 > URL: https://issues.apache.org/jira/browse/KAFKA-10283 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In StreamsPartitionAssignor, we do a two-level assignment, one on the > client-level, and then after the assignment is done we further decide within > the client how to distributed among consumers if there are more. > The {{ClientState}} class is used for book-keeping the assigned tasks, > however it is only used for the first level, while for the second level it is > done outside of the class and we only keep track of the results in a few maps > for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. > some on the client level and some on the consumer level. > We would like to consolidate some of these maps into a single data structure > for better keeping track of the assignment information, and also for less bug > vulnerability causing the assignment information to be inconsistent. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
mimaison commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-666512651 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording
vvcephei commented on a change in pull request #9098: URL: https://github.com/apache/kafka/pull/9098#discussion_r463128398 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) { throw new ProcessorStateException(fatal); } -// Setup metrics before the database is opened, otherwise the metrics are not updated +// Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB -maybeSetUpMetricsRecorder(configs); +maybeSetUpStatistics(configs); openRocksDB(dbOptions, columnFamilyOptions); open = true; + +addValueProvidersToMetricsRecorder(configs); } -private void maybeSetUpMetricsRecorder(final Map configs) { -if (userSpecifiedOptions.statistics() == null && +private void maybeSetUpStatistics(final Map configs) { +if (userSpecifiedOptions.statistics() != null) { +userSpecifiedStatistics = true; +} +if (!userSpecifiedStatistics && RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { -isStatisticsRegistered = true; // metrics recorder will clean up statistics object final Statistics statistics = new Statistics(); userSpecifiedOptions.setStatistics(statistics); -metricsRecorder.addStatistics(name, statistics); +} +} + +private void addValueProvidersToMetricsRecorder(final Map configs) { +final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig(); +final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics(); +if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) { +final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache(); +metricsRecorder.addValueProviders(name, db, cache, statistics); +} else { +metricsRecorder.addValueProviders(name, db, null, statistics); +log.warn("A table format configuration is used that does not expose the block cache. This means " + +"that metrics that relate to the block cache may be wrong if the block cache is shared."); } Review comment: Ah, after reading your test, I now see the issue. I'd overlooked the fact that users would independently construct the table config object AND the cache. I see now that this makes it impossible to reliably capture the cache, since users have to actually choose to pass our special table config to the Options and then pass the Cache to that table config. This doesn't seem ideal. What do you think about just using reflection instead? ```suggestion if (tableFormatConfig instanceof BlockBasedTableConfig) { final BlockBasedTableConfig blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig; try { final Field blockCacheField = BlockBasedTableConfig.class.getDeclaredField("blockCache_"); blockCacheField.setAccessible(true); final Cache nullableBlockCache = (Cache) blockCacheField.get(blockBasedTableConfig); metricsRecorder.addValueProviders(name, db, nullableBlockCache, statistics); } catch (final NoSuchFieldException | IllegalAccessException | ClassCastException e) { log.warn("Expected to find and access field 'blockCache_' in BlockBasedTableConfig. " + "Probably, an incompatible version of RocksDB is being used. " + "Cache will be missing from memory metrics.", e); } } else { metricsRecorder.addValueProviders(name, db, null, statistics); } ``` We would obviously test all the branches here to de-risk the reflection. We can also add a test that searches the classpath for implementations of TableFormatConfig to ensure we don't miss the memo if RocksDB adds a new TableFormatConfig implementation. Alternative thought, if you don't like the reflection: We would _also_ subclass Options and override `org.rocksdb.Options#setTableFormatConfig` to check if the passed `TableFormatConfig` is a `BlockBasedTableConfig`, and if so, then _we_ wrap it with `BlockBasedTableConfigWithAccessibleCache`. ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java ## @@ -227,7 +228,8 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final this.metrics = new StreamsMetricsImpl( new Metrics(metricConfig), thre
[GitHub] [kafka] vvcephei commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording
vvcephei commented on a change in pull request #9098: URL: https://github.com/apache/kafka/pull/9098#discussion_r463128398 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) { throw new ProcessorStateException(fatal); } -// Setup metrics before the database is opened, otherwise the metrics are not updated +// Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB -maybeSetUpMetricsRecorder(configs); +maybeSetUpStatistics(configs); openRocksDB(dbOptions, columnFamilyOptions); open = true; + +addValueProvidersToMetricsRecorder(configs); } -private void maybeSetUpMetricsRecorder(final Map configs) { -if (userSpecifiedOptions.statistics() == null && +private void maybeSetUpStatistics(final Map configs) { +if (userSpecifiedOptions.statistics() != null) { +userSpecifiedStatistics = true; +} +if (!userSpecifiedStatistics && RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { -isStatisticsRegistered = true; // metrics recorder will clean up statistics object final Statistics statistics = new Statistics(); userSpecifiedOptions.setStatistics(statistics); -metricsRecorder.addStatistics(name, statistics); +} +} + +private void addValueProvidersToMetricsRecorder(final Map configs) { +final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig(); +final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics(); +if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) { +final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache(); +metricsRecorder.addValueProviders(name, db, cache, statistics); +} else { +metricsRecorder.addValueProviders(name, db, null, statistics); +log.warn("A table format configuration is used that does not expose the block cache. This means " + +"that metrics that relate to the block cache may be wrong if the block cache is shared."); } Review comment: Ah, after reading your test, I now see the issue. I'd overlooked the fact that users would independently construct the table config object AND the cache. I see now that this makes it impossible to reliably capture the cache, since users have to actually choose to pass our special table config to the Options and then pass the Cache to that table config. This doesn't seem ideal. What do you think about just using reflection instead? ```suggestion if (tableFormatConfig instanceof BlockBasedTableConfig) { final BlockBasedTableConfig blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig; try { final Field blockCacheField = BlockBasedTableConfig.class.getDeclaredField("blockCache_"); blockCacheField.setAccessible(true); final Cache nullableBlockCache = (Cache) blockCacheField.get(blockBasedTableConfig); metricsRecorder.addValueProviders(name, db, nullableBlockCache, statistics); } catch (final NoSuchFieldException | IllegalAccessException | ClassCastException e) { log.warn("Expected to find and access field 'blockCache_' in BlockBasedTableConfig. " + "Probably, an incompatible version of RocksDB is being used. " + "Cache will be missing from memory metrics.", e); metricsRecorder.addValueProviders(name, db, null, statistics); } } else { metricsRecorder.addValueProviders(name, db, null, statistics); } ``` We would obviously test all the branches here to de-risk the reflection. We can also add a test that searches the classpath for implementations of TableFormatConfig to ensure we don't miss the memo if RocksDB adds a new TableFormatConfig implementation. Alternative thought, if you don't like the reflection: We would _also_ subclass Options and override `org.rocksdb.Options#setTableFormatConfig` to check if the passed `TableFormatConfig` is a `BlockBasedTableConfig`, and if so, then _we_ wrap it with `BlockBasedTableConfigWithAccessibleCache`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about th
[GitHub] [kafka] mimaison commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
mimaison commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r463097918 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -367,14 +406,37 @@ public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedExcept time.sleep(5000); // create a consumer at backup cluster with same consumer group Id to consume old and new topic -consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2"); +consumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "primary.test-topic-1", "primary.test-topic-2"); records = consumer.poll(Duration.ofMillis(500)); // similar reasoning as above, no more records to consume by the same consumer group at backup cluster assertEquals("consumer record size is not zero", 0, records.count()); consumer.close(); +} + +private void produceMessages(EmbeddedConnectCluster cluster, String topicName, int partitions, String msgPrefix) { +for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { +// produce to all partitions but the last one Review comment: This comment needs updating ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -190,24 +211,19 @@ public void close() { public void testReplication() throws InterruptedException { // create consumers before starting the connectors so we don't need to wait for discovery -Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"group.id", "consumer-group-1"), "test-topic-1", "backup.test-topic-1"); +Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", "backup.test-topic-1"); consumer1.poll(Duration.ofMillis(500)); consumer1.commitSync(); consumer1.close(); -Consumer consumer2 = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"group.id", "consumer-group-1"), "test-topic-1", "primary.test-topic-1"); +Consumer consumer2 = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", "primary.test-topic-1"); Review comment: Do we still need these 2 blocks? In `setup()` we already consumed all messages ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -128,10 +136,23 @@ public void setup() throws InterruptedException { backup.kafka().createTopic("primary.test-topic-1", 1); backup.kafka().createTopic("heartbeats", 1); -for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { -primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i); -backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i); -} +// produce to all partitions but the last one Review comment: Would it be better using a separate topic in order to keep a partition without any records? By changing this topic it affects existing checks in all tests ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -128,10 +136,23 @@ public void setup() throws InterruptedException { backup.kafka().createTopic("primary.test-topic-1", 1); backup.kafka().createTopic("heartbeats", 1); -for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { -primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i); -backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i); -} +// produce to all partitions but the last one +produceMessages(primary, "test-topic-1", NUM_PARTITIONS - 1, "message-1-"); +produceMessages(backup, "test-topic-1", NUM_PARTITIONS - 1, "message-2-"); + +consumerProps = new HashMap() {{ Review comment: As this does not change, I wonder if we could direct initialize `consumerProps` when it's declared ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -244,26 +251,50 @@ public void testReplication() throws InterruptedException { assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey( new TopicPartition("primary.test-topic-1", 0))); +assertTrue("Offset of empty partition not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey( +new TopicPartition("p
[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
mimaison commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-666526465 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-666531948 These test failures are known flaky tests which already have jira tickets This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] highluck opened a new pull request #9105: MINOR: closable object Memory leak prevention
highluck opened a new pull request #9105: URL: https://github.com/apache/kafka/pull/9105 closable object Memory leak prevention ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah merged pull request #9008: URL: https://github.com/apache/kafka/pull/9008 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState
[ https://issues.apache.org/jira/browse/KAFKA-10283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168100#comment-17168100 ] Guozhang Wang commented on KAFKA-10283: --- Sure [~high.lee]. I'd suggest you get familiar with the related classes of task assignment, StreamsPartitionAssignor, HighAvailabilityAssignor, especially the two-phase logic of 1) first assign across clients, and then 2) within each client assign across its consumers. And then you can propose a way to consolidate the data structures usage for these two phases. > Consolidate client-level and consumer-level assignment within ClientState > - > > Key: KAFKA-10283 > URL: https://issues.apache.org/jira/browse/KAFKA-10283 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In StreamsPartitionAssignor, we do a two-level assignment, one on the > client-level, and then after the assignment is done we further decide within > the client how to distributed among consumers if there are more. > The {{ClientState}} class is used for book-keeping the assigned tasks, > however it is only used for the first level, while for the second level it is > done outside of the class and we only keep track of the results in a few maps > for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. > some on the client level and some on the consumer level. > We would like to consolidate some of these maps into a single data structure > for better keeping track of the assignment information, and also for less bug > vulnerability causing the assignment information to be inconsistent. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
abbccdda commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-666590526 @mimaison Could you rebase? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10328) MockConsumer behaves differently than KafkaConsumer
Jeff Kim created KAFKA-10328: Summary: MockConsumer behaves differently than KafkaConsumer Key: KAFKA-10328 URL: https://issues.apache.org/jira/browse/KAFKA-10328 Project: Kafka Issue Type: Bug Components: consumer Reporter: Jeff Kim Assignee: Jeff Kim the behavior of `MockConsumer` is different from `KafkaConsumer` under multi-threaded access. MockConsumer should throw `ConcurrentModificationException` instead of synchronizing all methods. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch opened a new pull request #9106: KAFKA-10146, KAFKA-9066: Retain metrics for failed taskss (#8502)
rhauch opened a new pull request #9106: URL: https://github.com/apache/kafka/pull/9106 Targets the `2.4` branch. See #8854 for the similar PR for the `2.5` branch, which is currently blocked by the 2.5.1 release effort. This backports the KAFKA-9066 / #8502 changes to retain metrics for failed tasks that was already merged to trunk and backported to the 2.6 branch. Like #8854, this PR has one change relative to the original PR: it removes an integration test added in the 2.6 branch for KIP-158 and modified as part of KAFKA-9066 / #8502. Author: Chris Egerton Reviewers: Nigel Liang , Randall Hauch ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)
[ https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168155#comment-17168155 ] Sophie Blee-Goldman commented on KAFKA-10322: - Hey [~tbradlo] Thanks for the bug report. I agree, this is obviously incorrect. What I'm not yet understanding is why we don't run into this problem all the time -- the WindowKeySchema methods are used pretty heavily by RocksDBWindowStore as well, and from a glance they all seem to assume that the serialized bytes include a sequence number. I'll try to set up a test to figure out the true extent of this problem. Would you be interested in submitting a PR with the fix? > InMemoryWindowStore restore keys format incompatibility (lack of > sequenceNumber in keys on topic) > - > > Key: KAFKA-10322 > URL: https://issues.apache.org/jira/browse/KAFKA-10322 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 > Environment: windows/linux >Reporter: Tomasz Bradło >Priority: Major > > I have regular groupBy&Counting stream configuration: > {code:java} > > fun addStream(kStreamBuilder: StreamsBuilder) { > val storeSupplier = Stores.inMemoryWindowStore("count-store", > Duration.ofDays(10), > Duration.ofDays(1), > false) > val storeBuilder: StoreBuilder> = > Stores > .windowStoreBuilder(storeSupplier, > JsonSerde(CountableEvent::class.java), Serdes.Long()) > kStreamBuilder > .stream("input-topic", Consumed.with(Serdes.String(), > Serdes.String())) > .map {_, jsonRepresentation -> > KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} > .groupByKey() > .windowedBy(TimeWindows.of(Duration.ofDays(1))) > > .count(Materialized.with(JsonSerde(CountableEvent::class.java), > Serdes.Long())) > .toStream() > .to("topic1-count") > val storeConsumed = > Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), > Duration.ofDays(1).toMillis()), Serdes.Long()) > kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", > storeConsumed, passThroughProcessorSupplier) > }{code} > While sending to "topic1-count", for serializing the key > [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] > is used which is using > [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] > so the message key format is: > {code:java} > real_grouping_key + timestamp(8bytes){code} > > Everything works. I can get correct values from state-store. But, in recovery > scenario, when [GlobalStateManagerImpl > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters > offset < highWatermark loop then > [InMemoryWindowStore stateRestoreCallback > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads > from "topic1-count" and fails to extract valid key and timestamp using > [WindowKeySchema.extractStoreKeyBytes > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and > [WindowKeySchema.extractStoreTimestamp. > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It > fails because it expects format: > {code:java} > real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code} > How this is supposed to work in this case? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches
[ https://issues.apache.org/jira/browse/KAFKA-10146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168157#comment-17168157 ] Randall Hauch commented on KAFKA-10146: --- Since the 2.5.1 release is still ongoing, the PR for the 2.5 branch (https://github.com/apache/kafka/pull/8854) is blocked. Since the 2.4 branch is unblocked and we want to get this feature into the next 2.4.x release, I created a different PR targeting the 2.4 branch (https://github.com/apache/kafka/pull/9106). > Backport KAFKA-9066 to 2.5 and 2.4 branches > --- > > Key: KAFKA-10146 > URL: https://issues.apache.org/jira/browse/KAFKA-10146 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > Labels: backport > Fix For: 2.4.2, 2.5.2 > > > KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so > this was not backported at the time. However, once 2.5.1 is out the door, the > `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` > branches. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on pull request #9106: KAFKA-10146, KAFKA-9066: Retain metrics for failed tasks (backport to 2.4)
rhauch commented on pull request #9106: URL: https://github.com/apache/kafka/pull/9106#issuecomment-11491 @kkonstantine, here is a PR for the `2.4` branch that is a cherry-pick of the same one commit from #8854 that you've already approved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10329) Enable connector context in logs by default
Randall Hauch created KAFKA-10329: - Summary: Enable connector context in logs by default Key: KAFKA-10329 URL: https://issues.apache.org/jira/browse/KAFKA-10329 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 3.0.0 Reporter: Randall Hauch Fix For: 3.0.0 When [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs] was implemented and released as part of AK 2.3, we chose to not enable these extra logging context information by default because it was not backward compatible, and anyone relying upon the `connect-log4j.properties` file provided by the AK distribution would after an upgrade to AK 2.3 (or later) see different formats for their logs, which could break any log processing functionality they were relying upon. However, we should enable this in AK 3.0, whenever that comes. Doing so will require a fairly minor KIP to change the `connect-log4j.properties` file slightly. Marked this as BLOCKER since it's a backward incompatible change that we definitely want to do in the 3.0.0 release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)
[ https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168167#comment-17168167 ] Tomasz Bradło commented on KAFKA-10322: --- Hi [~ableegoldman] I don't know what is expected behavoiur here. Should recovery lambdas (both in InMemoryWindowStore and RocksDBWindowStore) assume that reading from topics there will be no sequence_number? > InMemoryWindowStore restore keys format incompatibility (lack of > sequenceNumber in keys on topic) > - > > Key: KAFKA-10322 > URL: https://issues.apache.org/jira/browse/KAFKA-10322 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 > Environment: windows/linux >Reporter: Tomasz Bradło >Priority: Major > > I have regular groupBy&Counting stream configuration: > {code:java} > > fun addStream(kStreamBuilder: StreamsBuilder) { > val storeSupplier = Stores.inMemoryWindowStore("count-store", > Duration.ofDays(10), > Duration.ofDays(1), > false) > val storeBuilder: StoreBuilder> = > Stores > .windowStoreBuilder(storeSupplier, > JsonSerde(CountableEvent::class.java), Serdes.Long()) > kStreamBuilder > .stream("input-topic", Consumed.with(Serdes.String(), > Serdes.String())) > .map {_, jsonRepresentation -> > KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} > .groupByKey() > .windowedBy(TimeWindows.of(Duration.ofDays(1))) > > .count(Materialized.with(JsonSerde(CountableEvent::class.java), > Serdes.Long())) > .toStream() > .to("topic1-count") > val storeConsumed = > Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), > Duration.ofDays(1).toMillis()), Serdes.Long()) > kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", > storeConsumed, passThroughProcessorSupplier) > }{code} > While sending to "topic1-count", for serializing the key > [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] > is used which is using > [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] > so the message key format is: > {code:java} > real_grouping_key + timestamp(8bytes){code} > > Everything works. I can get correct values from state-store. But, in recovery > scenario, when [GlobalStateManagerImpl > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters > offset < highWatermark loop then > [InMemoryWindowStore stateRestoreCallback > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads > from "topic1-count" and fails to extract valid key and timestamp using > [WindowKeySchema.extractStoreKeyBytes > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and > [WindowKeySchema.extractStoreTimestamp. > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It > fails because it expects format: > {code:java} > real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code} > How this is supposed to work in this case? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method
abbccdda commented on a change in pull request #9096: URL: https://github.com/apache/kafka/pull/9096#discussion_r463260878 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,6 +183,7 @@ private boolean allSubscriptionsEqual(Set allTopics, Map> assignment = new HashMap<>( consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; +// step 1: Reassign as many previously owned partitions as possible Review comment: nit: I don't think we need steps here, which makes it hard to squeeze in more comments later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
mimaison merged pull request #9029: URL: https://github.com/apache/kafka/pull/9029 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168214#comment-17168214 ] Mickael Maison commented on KAFKA-10017: A couple of more failures: - https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1780/consoleFull - https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1771/consoleFull > Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-10017 > URL: https://issues.apache.org/jira/browse/KAFKA-10017 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Blocker > Labels: flaky-test, unit-test > Fix For: 2.6.0 > > > Creating a new ticket for this since the root cause is different than > https://issues.apache.org/jira/browse/KAFKA-9966 > With injectError = true: > h3. Stacktrace > java.lang.AssertionError: Did not receive all 20 records from topic > multiPartitionOutputTopic within 6 ms Expected: is a value equal to or > greater than <20> but: <15> was less than <20> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-666701712 @abbccdda Done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)
[ https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168231#comment-17168231 ] Sophie Blee-Goldman commented on KAFKA-10322: - Ok, I dug into this a big further and found that the changelogging layer will actually _always_ insert a sequence number, even when it's not necessary (ie retainDuplicates is false). On restoration this extra sequence number is dropped, so the correct bytes are ultimately inserted into the store. While this is obviously a bug in that we're storing an extra 4 bytes in the changelog, it seems like the key extraction should technically work on restoration. Did you hit an exception or other error after a restore, or did you just notice that something was wrong while looking at the source? > InMemoryWindowStore restore keys format incompatibility (lack of > sequenceNumber in keys on topic) > - > > Key: KAFKA-10322 > URL: https://issues.apache.org/jira/browse/KAFKA-10322 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 > Environment: windows/linux >Reporter: Tomasz Bradło >Priority: Major > > I have regular groupBy&Counting stream configuration: > {code:java} > > fun addStream(kStreamBuilder: StreamsBuilder) { > val storeSupplier = Stores.inMemoryWindowStore("count-store", > Duration.ofDays(10), > Duration.ofDays(1), > false) > val storeBuilder: StoreBuilder> = > Stores > .windowStoreBuilder(storeSupplier, > JsonSerde(CountableEvent::class.java), Serdes.Long()) > kStreamBuilder > .stream("input-topic", Consumed.with(Serdes.String(), > Serdes.String())) > .map {_, jsonRepresentation -> > KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} > .groupByKey() > .windowedBy(TimeWindows.of(Duration.ofDays(1))) > > .count(Materialized.with(JsonSerde(CountableEvent::class.java), > Serdes.Long())) > .toStream() > .to("topic1-count") > val storeConsumed = > Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), > Duration.ofDays(1).toMillis()), Serdes.Long()) > kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", > storeConsumed, passThroughProcessorSupplier) > }{code} > While sending to "topic1-count", for serializing the key > [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] > is used which is using > [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] > so the message key format is: > {code:java} > real_grouping_key + timestamp(8bytes){code} > > Everything works. I can get correct values from state-store. But, in recovery > scenario, when [GlobalStateManagerImpl > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters > offset < highWatermark loop then > [InMemoryWindowStore stateRestoreCallback > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads > from "topic1-count" and fails to extract valid key and timestamp using > [WindowKeySchema.extractStoreKeyBytes > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and > [WindowKeySchema.extractStoreTimestamp. > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It > fails because it expects format: > {code:java} > real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code} > How this is supposed to work in this case? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10137) Clean-up retain Duplicate logic in Window Stores
[ https://issues.apache.org/jira/browse/KAFKA-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168234#comment-17168234 ] Sophie Blee-Goldman commented on KAFKA-10137: - I think there's actually a real bug lurking here: I was looking at the ChangeLoggingWindowBytesStore and noticed we seem to insert the sequence number into the changelogged bytes regardless of `retainDuplicates`. We peel off the unnecessary seqnum during restoration, so it doesn't seem to cause any correctness issues. But we're obviously storing an extra 4 bytes per window store changelog record for no reason. Unfortunatel,y I'm not sure how this can be fixed in a backwards compatible way > Clean-up retain Duplicate logic in Window Stores > > > Key: KAFKA-10137 > URL: https://issues.apache.org/jira/browse/KAFKA-10137 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Priority: Minor > > Stream-stream joins use the regular `WindowStore` implementation but with > `retainDuplicates` set to true. To allow for duplicates while using the same > unique-key underlying stores we just wrap the key with an incrementing > sequence number before inserting it. > The logic to maintain and append the sequence number is present in multiple > locations, namely in the changelogging window store and in its underlying > window stores. We should consolidate this code to one single location. -- This message was sent by Atlassian Jira (v8.3.4#803005)