[GitHub] [kafka] vvcephei merged pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
vvcephei merged pull request #9066: URL: https://github.com/apache/kafka/pull/9066 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9084: MINOR: Preserve Kafka exception from RebalanceListener [Do Not Merge]
guozhangwang commented on pull request #9084: URL: https://github.com/apache/kafka/pull/9084#issuecomment-664698317 @abbccdda The exception is thrown from the consumer.poll, and the caller of the consumer could be expecting specific exceptions, e.g. TaskMigratedException will be handled specifically in Streams, but a general KafkaException would be treated as a fatal error. We're already doing this e.g. in ConsumerCoordinator line 432, but it is not done in all occasions, so I'm just trying to make the behavior to be consistent across all callers here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)
dajac commented on a change in pull request #9072: URL: https://github.com/apache/kafka/pull/9072#discussion_r461123154 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MetricConfig; + +public class TokenBucket extends SampledStat { Review comment: Note to myself: Add javadoc. ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MetricConfig; + +public class TokenBucket extends SampledStat { + +private final TimeUnit unit; + +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +public TokenBucket(TimeUnit unit) { Review comment: That is correct. This is bit unfortunate but as the two are independent from each others, we can't enforce using the same timeunit nor reuse the time unit of the rate. I will make this clear in the javadoc. ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MetricConfig; + +public class TokenBucket extends SampledStat { + +private final TimeUnit unit; + +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +public TokenBucket(TimeUnit unit) { +super(0); +this.unit = unit; +} + +@Override +public void record(MetricConfig config, double value, long timeMs) { +if (value < 0) { +unrecord(config, -value, timeMs); +return; +} + +final double quota = quota(config); +final long firstTimeWindowMs = firstTimeWindowMs(config, timeMs); + +// Get current sample or create one if empty +Sample sample = current(firstTimeWindowMs); + +// Verify that the current sample was not reinitialized. If it was the case, +// restart from the first time window
[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r461266795 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -38,13 +41,39 @@ */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; +static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L; Review comment: Yeah my major concern is to tie the flushing policy with rocksdb -- although it is the default persistent stores now, we should avoid tying with a specific type of stores. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -454,6 +456,41 @@ public void flush() { } } +public void flushCache() { +RuntimeException firstException = null; +// attempting to flush the stores +if (!stores.isEmpty()) { +log.debug("Flushing all store caches registered in the state manager: {}", stores); +for (final StateStoreMetadata metadata : stores.values()) { +final StateStore store = metadata.stateStore; + +try { +// buffer should be flushed to send all records to changelog +if (store instanceof TimeOrderedKeyValueBuffer) { Review comment: I'm thinking we can remove the whole `flushCache` method. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -38,13 +41,39 @@ */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; +static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L; private StateManagerUtil() {} static RecordConverter converterForStore(final StateStore store) { return isTimestamped(store) ? rawValueToTimestampedValue() : identity(); } +static boolean checkpointNeeded(final boolean enforceCheckpoint, +final Map oldOffsetSnapshot, +final Map newOffsetSnapshot) { +// we should always have the old snapshot post completing the register state stores; +// if it is null it means the registration is not done and hence we should not overwrite the checkpoint +if (oldOffsetSnapshot == null) +return false; + +// if the previous snapshot is empty while the current snapshot is not then we should always checkpoint; +// note if the task is stateless or stateful but no stores logged, the snapshot would also be empty +// and hence it's okay to not checkpoint +if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty()) +return true; + +// we can checkpoint if the the difference between the current and the previous snapshot is large enough +long totalOffsetDelta = 0L; +for (final Map.Entry entry : newOffsetSnapshot.entrySet()) { +totalOffsetDelta += Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue()); +} + +// when enforcing checkpoint is required, we should overwrite the checkpoint if it is different from the old one; +// otherwise, we only overwrite the checkpoint if it is largely different from the old one +return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT; Review comment: I think we do not need to enforce checkpoint during suspension but only need to do that during closure / recycling; if a suspended task is resumed then we do not need to write checkpoint in between. But admittedly moving forward most suspended tasks would be closed or recycled :slightly_smiling_face: So I can change that back. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgroothuijsen commented on pull request #9089: KAFKA-10224: Update jersey license from CDDL to EPLv2
rgroothuijsen commented on pull request #9089: URL: https://github.com/apache/kafka/pull/9089#issuecomment-665171345 @rhauch No problem, thanks for the help! I used the version from the Eclipse home page, but this one has much nicer formatting as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgroothuijsen opened a new pull request #9089: KAFKA-10224: Update jersey license from CDDL to EPLv2
rgroothuijsen opened a new pull request #9089: URL: https://github.com/apache/kafka/pull/9089 Jersey has changed its license from CDDL to EPLv2 starting from version 2.28. This PR updates the included license information to reflect this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
jeffkbkim commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r461742057 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1986,101 +1965,125 @@ private[controller] class ControllerStats extends KafkaMetricsGroup { sealed trait ControllerEvent { def state: ControllerState + def preempt(): Unit } case object ControllerChange extends ControllerEvent { - override def state = ControllerState.ControllerChange + override def state: ControllerState = ControllerState.ControllerChange + override def preempt(): Unit = {} Review comment: @mumrah one of the issues was that people are able to create controller events without implementing preemption. In most cases, this is fine because most events don't have callbacks. But for events that do have callbacks, someone can forget to implement preemption and this won't be caught during compile time. This is to force an implementation of preemption which can be empty when designing a new event. ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2090,6 +2093,12 @@ case class ReplicaLeaderElection( callback: ElectLeadersCallback = _ => {} ) extends ControllerEvent { override def state: ControllerState = ControllerState.ManualLeaderBalance + + override def preempt(): Unit = callback( +partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions => Review comment: @mumrah i believe the initial `Map.empty[TopicPartition, Either[ApiError, Int]]` was used to structure the return type (`ElectLeadersCallback: Map[TopicPartition, Either[ApiError, Int]] => Unit`) which cannot be achieved using `foreach`. ## File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala ## @@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int, private val putLock = new ReentrantLock() private val queue = new LinkedBlockingQueue[QueuedEvent] // Visible for test - private[controller] val thread = new ControllerEventThread(ControllerEventThreadName) + private[controller] var thread = new ControllerEventThread(ControllerEventThreadName) Review comment: @jsancio updated the PR. i wrapped the entire method in a lock because the return type `QueuedEvent` can be retrieved by `put(event)`. the suggested code would require another variable to be initialized, then set inside the lock, then returned at the end. I thought it would be best to prefer simplicity and readability over performance as this only occurs in the start and end of `ControllerEventManager`'s lifecycle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on pull request #8964: URL: https://github.com/apache/kafka/pull/8964#issuecomment-664688329 @ableegoldman @vvcephei I've thought about controlling the frequency of flushing, and I've decided to expose another public config to let users specify. But I'd defer that to another PR. I'd like to do the following: 1) Merge this PR based on a hard-coded num.records difference. 2) Run benchmarks to understand its implications on stateful applications with / wo EOS. 3) Then file a KIP that introduce a new config value, also potentially change the default commit interval with EOS. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-664652669 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgibaiev edited a comment on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null
rgibaiev edited a comment on pull request #8575: URL: https://github.com/apache/kafka/pull/8575#issuecomment-665225469 Any update? @rhauch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness
guozhangwang commented on pull request #9087: URL: https://github.com/apache/kafka/pull/9087#issuecomment-665171108 For correctness I agree; though for some tests we cover the rebalance triggered by member dropping out of the group and hence reducing the session timeout also largely reduces the overall run time of the test case, so I'd say for testing runtime we may still want to check case-by-case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
rhauch commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-665065731 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)
apovzner commented on a change in pull request #9072: URL: https://github.com/apache/kafka/pull/9072#discussion_r461297706 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MetricConfig; + +public class TokenBucket extends SampledStat { + +private final TimeUnit unit; + +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +public TokenBucket(TimeUnit unit) { Review comment: Would be useful to add javadoc here for `unit` param. I understand, the unit needs to match Quota representation in `config` param passed to `record()` method, right? ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MetricConfig; + +public class TokenBucket extends SampledStat { + +private final TimeUnit unit; + +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +public TokenBucket(TimeUnit unit) { +super(0); +this.unit = unit; +} + +@Override +public void record(MetricConfig config, double value, long timeMs) { +if (value < 0) { +unrecord(config, -value, timeMs); +return; +} + +final double quota = quota(config); +final long firstTimeWindowMs = firstTimeWindowMs(config, timeMs); + +// Get current sample or create one if empty +Sample sample = current(firstTimeWindowMs); + +// Verify that the current sample was not reinitialized. If it was the case, +// restart from the first time window +if (sample.eventCount == 0) { +sample.reset(firstTimeWindowMs); +} + +// Add the value to the current sample +sample.value += value; +sample.eventCount += 1; + +// If current sample is completed AND a new one can be created, +// create one and spill over the amount above the quota to the +// new sample. Repeat until either the sample is not complete +// or no new sample can be created. +while (sample.isComplete(timeMs, config)) { +double extra = sample.value - quota; +sample.value = quota; + +sample = advance(config, sample.lastWindowMs + config.timeWindowMs()); +sample.value = extra; +sample.eventCount += 1; +} +} + +private void unrecord(MetricConfig config, double value, long timeMs) { +final double quota = quota(config); +final long firstTimeWindowMs = firstTimeWindowMs(config, timeMs); + +// Rewind +while (value > 0) { +// Get current sample or create one if empty +Sample sample = current(firstTimeWindowMs); + +// If the current sample has been purged, we can't unrecord anything +if (sample.eventCount == 0) { +return; +
[GitHub] [kafka] huxihx commented on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once
huxihx commented on pull request #8984: URL: https://github.com/apache/kafka/pull/8984#issuecomment-664804260 Ping @omkreddy @mimaison for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null
rhauch commented on pull request #8575: URL: https://github.com/apache/kafka/pull/8575#issuecomment-665303585 This PR has a conflict that has to be resolved first. Also, KIP-581 must be passed before this can be merged. See the discussion thread for the KIP. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang edited a comment on pull request #9011: KAFKA-10134: Still depends on existence of any fetchable partitions to block on join
guozhangwang edited a comment on pull request #9011: URL: https://github.com/apache/kafka/pull/9011#issuecomment-665182983 @hachikuji That's what I was wondering as well. From the logs we have: https://issues.apache.org/jira/secure/attachment/13008127/consumer5.log.2020-07-22.log If you search for `Returning timer remaining` you can find that in most time it is normal, and during that re-joining it is indeed returning 0 but that's expected since we could not connect to the broker during that period of time. So there's no surprising `timeToNextHeartbeat` returning 0 from that run. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
mumrah commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r461709093 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1986,101 +1965,125 @@ private[controller] class ControllerStats extends KafkaMetricsGroup { sealed trait ControllerEvent { def state: ControllerState + def preempt(): Unit } case object ControllerChange extends ControllerEvent { - override def state = ControllerState.ControllerChange + override def state: ControllerState = ControllerState.ControllerChange + override def preempt(): Unit = {} Review comment: Can we define a default implementation on the trait rather than overriding all these with empty functions? ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2090,6 +2093,12 @@ case class ReplicaLeaderElection( callback: ElectLeadersCallback = _ => {} ) extends ControllerEvent { override def state: ControllerState = ControllerState.ManualLeaderBalance + + override def preempt(): Unit = callback( +partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions => Review comment: nit: I think you can use `foreach` instead of fold + default value This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #9091: MINOR; Make KafkaAdminClientTest.testDescribeLogDirsPartialFailure and KafkaAdminClientTest.testAlterReplicaLogDirsPartialFailure test more reli
dajac opened a new pull request #9091: URL: https://github.com/apache/kafka/pull/9091 I have seem them failing locally from times to times. It seems that the changes made in https://github.com/apache/kafka/pull/8864 have made them more fragile. I have updated them to be more reliable. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #9096: Add comments to constrainedAssign and generalAssign method
showuon opened a new pull request #9096: URL: https://github.com/apache/kafka/pull/9096 Recently, I tried to read the codes to understand what the `constrainedAssign` and `generalAssign` method is doing, but it is so difficult and suffering due to the complexity of the algorithm. And then I traced back to the JIRA ticket and KIP to get much more information about them. So, I think we should put the algorithm goal and main steps in the code comments, to let other developers better understand them and better do trouble shooting if any. **reference for `constrainedAssign` algorithm:** https://issues.apache.org/jira/browse/KAFKA-9987?focusedCommentId=17106832&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17106832 **reference for `generalAssign` algorithm:** https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
viktorsomogyi commented on pull request #4090: URL: https://github.com/apache/kafka/pull/4090#issuecomment-664920356 Also the jdk11&scala2.13 build timed out due to another test: ``` 12:49:33 org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest > shouldDieOnInvalidOffsetExceptionDuringStartup STARTED 16:47:21 Build timed out (after 270 minutes). Marking the build as aborted. 16:47:21 Build was aborted 16:47:22 [FINDBUGS] Skipping publisher since build result is ABORTED 16:47:22 Recording test results 16:47:22 Setting MAVEN_LATEST__HOME=/home/jenkins/tools/maven/latest/ 16:47:22 Setting GRADLE_4_10_3_HOME=/home/jenkins/tools/gradle/4.10.3 16:47:22 16:47:22 org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest > shouldDieOnInvalidOffsetExceptionDuringStartup SKIPPED ``` The test caused the build to get stuck and time out. Based on a successful local run this is also flaky. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9011: KAFKA-10134: Still depends on existence of any fetchable partitions to block on join
guozhangwang commented on pull request #9011: URL: https://github.com/apache/kafka/pull/9011#issuecomment-665182983 That's what I was wondering as well. From the logs we have: https://issues.apache.org/jira/secure/attachment/13008127/consumer5.log.2020-07-22.log If you search for `Returning timer remaining` you can find that in most time it is normal, and during that re-joining it is indeed returning 0 but that's expected since we could not connect to the broker during that period of time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch merged pull request #9089: KAFKA-10224: Update jersey license from CDDL to EPLv2
rhauch merged pull request #9089: URL: https://github.com/apache/kafka/pull/9089 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs
tombentley commented on pull request #8312: URL: https://github.com/apache/kafka/pull/8312#issuecomment-664832499 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()
tombentley commented on pull request #9007: URL: https://github.com/apache/kafka/pull/9007#issuecomment-664836543 @mimaison done, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
ableegoldman commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r461229173 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -38,13 +41,39 @@ */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; +static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L; Review comment: Something like that. We know rocksdb will render the memtable immutable once it reaches the configured memtable size, after that it will flush once the number of immutable memtables reaches the configured value. Probably makes sense to align our checkpoint/flushing to the configured rocksdb flushing. Would be cool if we could piggy-back on the rocksdb options and avoid a new config in Streams altogether, but obviously not everyone uses rocksdb ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -454,6 +456,41 @@ public void flush() { } } +public void flushCache() { +RuntimeException firstException = null; +// attempting to flush the stores +if (!stores.isEmpty()) { +log.debug("Flushing all store caches registered in the state manager: {}", stores); +for (final StateStoreMetadata metadata : stores.values()) { +final StateStore store = metadata.stateStore; + +try { +// buffer should be flushed to send all records to changelog +if (store instanceof TimeOrderedKeyValueBuffer) { Review comment: So you're saying we'd still need to flush the suppression buffer but not the cache once we decouple caching from emitting? Or that we can remove this `flushCache` method altogether once that is done? Or that it will still do some flushing, but will not resemble the current `flushCache` method at all ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -454,6 +456,41 @@ public void flush() { } } +public void flushCache() { +RuntimeException firstException = null; +// attempting to flush the stores +if (!stores.isEmpty()) { +log.debug("Flushing all store caches registered in the state manager: {}", stores); +for (final StateStoreMetadata metadata : stores.values()) { +final StateStore store = metadata.stateStore; + +try { +// buffer should be flushed to send all records to changelog +if (store instanceof TimeOrderedKeyValueBuffer) { Review comment: Is there a ticket for that? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -38,13 +41,39 @@ */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; +static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L; private StateManagerUtil() {} static RecordConverter converterForStore(final StateStore store) { return isTimestamped(store) ? rawValueToTimestampedValue() : identity(); } +static boolean checkpointNeeded(final boolean enforceCheckpoint, +final Map oldOffsetSnapshot, +final Map newOffsetSnapshot) { +// we should always have the old snapshot post completing the register state stores; +// if it is null it means the registration is not done and hence we should not overwrite the checkpoint +if (oldOffsetSnapshot == null) +return false; + +// if the previous snapshot is empty while the current snapshot is not then we should always checkpoint; +// note if the task is stateless or stateful but no stores logged, the snapshot would also be empty +// and hence it's okay to not checkpoint +if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty()) +return true; + +// we can checkpoint if the the difference between the current and the previous snapshot is large enough +long totalOffsetDelta = 0L; +for (final Map.Entry entry : newOffsetSnapshot.entrySet()) { +totalOffsetDelta += Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue()); +} + +// when enforcing checkpoint is required, we should overwrite the checkpoint if it is different from the old one; +// otherwise, we only overwrite the checkpoint if it is largely different from the old one +return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT; Review comment: I'm still not following -- I thought we no longer commit during c
[GitHub] [kafka] dajac commented on pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)
dajac commented on pull request #9072: URL: https://github.com/apache/kafka/pull/9072#issuecomment-664923691 @apovzner Thanks for your comments. I have updated the PR to incorporate your feedback. I have also fixed a bug in unrecord and added the javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cyrusv opened a new pull request #9093: Throw error on when keys not found in FileConfigProvider
cyrusv opened a new pull request #9093: URL: https://github.com/apache/kafka/pull/9093 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sasakitoa commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
sasakitoa commented on pull request #9081: URL: https://github.com/apache/kafka/pull/9081#issuecomment-664725165 Thank you for kindness comments. I updated to improve java doc and related test code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx edited a comment on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once
huxihx edited a comment on pull request #8984: URL: https://github.com/apache/kafka/pull/8984#issuecomment-664804260 Ping @omkreddy @mimaison @dajac for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9091: MINOR; Make KafkaAdminClientTest.testDescribeLogDirsPartialFailure and KafkaAdminClientTest.testAlterReplicaLogDirsPartialFailure test more reli
dajac commented on pull request #9091: URL: https://github.com/apache/kafka/pull/9091#issuecomment-664848664 @chia7712 Ack. I reverted `testMetadataRetries` back to the original version prior to #8864. That test is specific for the retries so it doesn't make sense to remove the retries config. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9086: FIX: Remove staticmethod tag to be able to use logger of instance
mjsax commented on pull request #9086: URL: https://github.com/apache/kafka/pull/9086#issuecomment-664684265 Merged to `trunk` and cherry-picked to `2.6`, `2.5`, `2.4`, and `2.3` branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager
cmccabe commented on a change in pull request #9012: URL: https://github.com/apache/kafka/pull/9012#discussion_r461884961 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit} + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import kafka.network.RequestChannel +import kafka.utils.Logging +import org.apache.kafka.clients._ +import org.apache.kafka.common.requests.AbstractRequest +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.Node +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.AbstractRequest.NoOpRequestBuilder +import org.apache.kafka.common.security.JaasContext + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * This class manages the connection between a broker and the controller. It runs a single + * {@link BrokerToControllerRequestThread} which uses the broker's metadata cache as its own metadata to find + * and connect to the controller. The channel is async and runs the network connection in the background. + * The maximum number of in-flight requests are set to one to ensure orderly response from the controller, therefore + * care must be taken to not block on outstanding requests for too long. + */ +class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache, + time: Time, + metrics: Metrics, + config: KafkaConfig, + threadNamePrefix: Option[String] = None) extends Logging { + private val requestQueue = new LinkedBlockingQueue[BrokerToControllerQueueItem] + private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ") + private val manualMetadataUpdater = new ManualMetadataUpdater() + private val requestThread = newRequestThread + + def start(): Unit = { +requestThread.start() + } + + def shutdown(): Unit = { +requestThread.shutdown() +requestThread.awaitShutdown() + } + + private[server] def newRequestThread = { +val brokerToControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) +val brokerToControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + +val networkClient = { + val channelBuilder = ChannelBuilders.clientChannelBuilder( +brokerToControllerSecurityProtocol, +JaasContext.Type.SERVER, +config, +brokerToControllerListenerName, +config.saslMechanismInterBrokerProtocol, +time, +config.saslInterBrokerHandshakeRequestEnable, +logContext + ) + val selector = new Selector( +NetworkReceive.UNLIMITED, +Selector.NO_IDLE_TIMEOUT_MS, +metrics, +time, +"BrokerToControllerChannel", +Map("BrokerId" -> config.brokerId.toString).asJava, +false, +channelBuilder, +logContext + ) + new NetworkClient( +selector, +manualMetadataUpdater, +config.brokerId.toString, +1, +0, +0, +Selectable.USE_DEFAULT_BUFFER_SIZE, +Selectable.USE_DEFAULT_BUFFER_SIZE, +config.requestTimeoutMs, +config.connectionSetupTimeoutMs, +config.connectionSetupTimeoutMaxMs, +ClientDnsLookup.DEFAULT, Review comment: @abbccdda : Please switch this to `ClientDnsLookup.USE_ALL_DNS_IPS` to be consistent with the other NetworkClients, such as the one in `ControllerChannelManager`, etc. ## File path: core/src/main/scala/kafka/server/KafkaServer.scala ## @@ -168,6 +168,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP var kafkaController: KafkaController = null + var brokerToControllerChann
[GitHub] [kafka] rhauch commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
rhauch commented on pull request #9051: URL: https://github.com/apache/kafka/pull/9051#issuecomment-665124035 This was also cherry-picked to `2.6`, but that branch has been frozen while we try to release AK 2.6.0. However, given that this is low-risk, I'll leave it on `2.6` and update [KAFKA-10268](https://issues.apache.org/jira/browse/KAFKA-10268) instead. @huxihx, next time please refrain from cherry-picking to frozen branches. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
ableegoldman commented on pull request #9094: URL: https://github.com/apache/kafka/pull/9094#issuecomment-665394762 call for review @vvcephei @cadonna @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #9089: KAFKA-10224: Update jersey license from CDDL to EPLv2
rhauch commented on pull request #9089: URL: https://github.com/apache/kafka/pull/9089#issuecomment-665168618 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] codelabor opened a new pull request #9090: toString pattern change like ProducerRecord.java
codelabor opened a new pull request #9090: URL: https://github.com/apache/kafka/pull/9090 In most of 'toString()' methods,'=' is used instead of ' = ', so I followed the convention. Actually, if you print 'ProducerRecord' and 'ConsumerRecord', the pattern looks different and looks strange. - producerRecord: ProducerRecord(topic=, partition=null, headers= ...) consumerRecord: ConsumerRecord(topic = , partition = 0, ..., headers = ...) *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
hachikuji commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r461114945 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -292,21 +278,36 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, while (offset < highWatermark) { try { -final ConsumerRecords records = globalConsumer.poll(pollTime); +final ConsumerRecords records = + globalConsumer.poll(pollTimePlusRequestTimeoutPlusTaskTimeout); Review comment: It does seem a bit weird here to add in the request timeout. Not sure I follow the reasoning behind that... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgibaiev commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null
rgibaiev commented on pull request #8575: URL: https://github.com/apache/kafka/pull/8575#issuecomment-665225469 Any update? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining
showuon commented on pull request #9062: URL: https://github.com/apache/kafka/pull/9062#issuecomment-664801932 hi @feyman2016 @huxihx , could you help review this small PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long
dajac commented on pull request #9092: URL: https://github.com/apache/kafka/pull/9092#issuecomment-664996150 @rajinisivaram I have updated an existing test to use 1.5 instead of 2 to verify this. https://github.com/apache/kafka/pull/9092/files#diff-296d7b93103356535b8b891f019e4651R113. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims
vvcephei commented on pull request #9004: URL: https://github.com/apache/kafka/pull/9004#issuecomment-664740837 Thanks for the review, @abbccdda ! I've addressed your feedback. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r461143961 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Ok, thanks @mjsax . I just traced though the consumer code again, and have finally been able to see what you already knew: that `request.timeout.ms` is indeed the correct amount of time to wait. Namely, we send a fetch here: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1285 Which calls through to client.send here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L263 Which fills in the `request.timeout.ms` config value here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L106 Which uses it to construct a ClientRequest here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L129-L130 Which then gets used to create an InFlightRequest when it gets sent here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1248 Which is later used to detect expired requests here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L162 Which is used to list nodes (brokers) for which there is an expired request here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L179 Which is then processed as a "disconnection" here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L803 It also looks like the KafkaClient just does a tight-loop checking for a network response, so we don't really need any extra time to account for sampling errors. Also, it still seems like using the sum as the poll duration is just as good as using your retry logic, so I think the duration parameter is fine. My only remaining question, which maybe doesn't really matter one way or another, is whether `poll.ms` really belongs here or not. It seems like the desired semantics are accomplished by just waiting `request.timeout.ms` for the initial failure, and then an extra `task.timeout.ms` for any retries. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -292,21 +278,36 @@
[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 edited a comment on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-665149860 ```streams_standby_replica_test``` -> https://issues.apache.org/jira/browse/KAFKA-10287 I will take a look at ```streams_broker_bounce_test``` (https://issues.apache.org/jira/browse/KAFKA-10292) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness
cadonna commented on pull request #9087: URL: https://github.com/apache/kafka/pull/9087#issuecomment-664621717 @abbccdda Looking at this test class again, I even think that we could set session timeout and heartbeat interval to default for all tests. The main motivation to reduce them was to not need to change the application ID for each test (see https://github.com/apache/kafka/pull/8530#discussion_r413170584). However, in the meanwhile each test has its own application ID, so I guess it would be fine to use again the defaults here. Is this correct @guozhangwang ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9012: KAFKA-10270: A broker to controller channel manager
abbccdda commented on pull request #9012: URL: https://github.com/apache/kafka/pull/9012#issuecomment-665410633 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with adapters
abbccdda commented on a change in pull request #9004: URL: https://github.com/apache/kafka/pull/9004#discussion_r462057744 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -764,12 +764,12 @@ private void connectProcessorAndStateStore(final String processorName, if (!sourceTopics.isEmpty()) { stateStoreNameToSourceTopics.put(stateStoreName, -Collections.unmodifiableSet(sourceTopics)); + Collections.unmodifiableSet(sourceTopics)); Review comment: format looks weird, maybe just do 4 spaces ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java ## @@ -57,12 +57,12 @@ public StateStore getStateStore(final String name) { @SuppressWarnings("unchecked") @Override -public void forward(final K key, final V value) { -final ProcessorNode previousNode = currentNode(); +public void forward(final KIn key, final VIn value) { +final ProcessorNode previousNode = currentNode(); try { -for (final ProcessorNode child : currentNode().children()) { +for (final ProcessorNode child : currentNode().children()) { setCurrentNode(child); -((ProcessorNode) child).process(key, value); +((ProcessorNode) child).process(key, value); // FIXME Review comment: Could we leave a more clear comment on what needs to be fixed? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.To; + +import java.io.File; +import java.time.Duration; +import java.util.Map; + +/** + * Processor context interface. + * + * @param a bound on the types of keys that may be forwarded + * @param a bound on the types of values that may be forwarded + */ +public interface ProcessorContext { + +/** + * Returns the application id. + * + * @return the application id + */ +String applicationId(); + +/** + * Returns the task id. + * + * @return the task id + */ +TaskId taskId(); + +/** + * Returns the default key serde. + * + * @return the key serializer + */ +Serde keySerde(); + +/** + * Returns the default value serde. + * + * @return the value serializer + */ +Serde valueSerde(); + +/** + * Returns the state directory for the partition. + * + * @return the state directory + */ +File stateDir(); + +/** + * Returns Metrics instance. + * + * @return StreamsMetrics + */ +StreamsMetrics metrics(); + +/** + * Registers and possibly restores the specified storage engine. + * + * @param store the storage engine + * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart + * + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ +void register(final StateStore store, + final StateRestoreCallback stateRestoreCallback); + +/** + * Get the state store given the store name. + * + * @pa
[GitHub] [kafka] cmccabe commented on pull request #9012: KAFKA-10270: A broker to controller channel manager
cmccabe commented on pull request #9012: URL: https://github.com/apache/kafka/pull/9012#issuecomment-665288947 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #9094: KAFKA-10054: add TRACE-level e2e latency metrics
ableegoldman opened a new pull request #9094: URL: https://github.com/apache/kafka/pull/9094 Adds avg, min, and max e2e latency metrics at the new TRACE level. Also adds the missing `avg` task-level metric at the INFO level. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda edited a comment on pull request #9012: KAFKA-10270: A broker to controller channel manager
abbccdda edited a comment on pull request #9012: URL: https://github.com/apache/kafka/pull/9012#issuecomment-665410633 Got 2/3 green, with one jenkins job terminated unexpectedly. https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7602/console This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
abbccdda commented on pull request #9081: URL: https://github.com/apache/kafka/pull/9081#issuecomment-665157437 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
abbccdda commented on a change in pull request #9081: URL: https://github.com/apache/kafka/pull/9081#discussion_r461313377 ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) Review comment: nit: could use servers.indices ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) + killBroker(i) + +try { + producer.sendOffsetsToTransaction(Map( Review comment: Do we have unit test coverage for other transaction API max blocking as well? Do you mind adding them as separate tests and share the same module? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on pull request #9047: URL: https://github.com/apache/kafka/pull/9047#issuecomment-664773350 @vvcephei @abbccdda @guozhangwang @hachikuji -- I updated this PR according to your discussions. Needed to squash for rebasing to resolve conflicts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims
vvcephei commented on a change in pull request #9004: URL: https://github.com/apache/kafka/pull/9004#discussion_r461271276 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -880,40 +880,41 @@ public synchronized ProcessorTopology buildGlobalStateTopology() { return globalGroups; } +@SuppressWarnings("unchecked") private ProcessorTopology build(final Set nodeGroup) { Objects.requireNonNull(applicationId, "topology has not completed optimization"); -final Map> processorMap = new LinkedHashMap<>(); -final Map> topicSourceMap = new HashMap<>(); -final Map> topicSinkMap = new HashMap<>(); +final Map> processorMap = new LinkedHashMap<>(); +final Map> topicSourceMap = new HashMap<>(); +final Map> topicSinkMap = new HashMap<>(); final Map stateStoreMap = new LinkedHashMap<>(); final Set repartitionTopics = new HashSet<>(); // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) // also make sure the state store map values following the insertion ordering -for (final NodeFactory factory : nodeFactories.values()) { +for (final NodeFactory factory : nodeFactories.values()) { if (nodeGroup == null || nodeGroup.contains(factory.name)) { -final ProcessorNode node = factory.build(); +final ProcessorNode node = factory.build(); processorMap.put(node.name(), node); if (factory instanceof ProcessorNodeFactory) { buildProcessorNode(processorMap, stateStoreMap, - (ProcessorNodeFactory) factory, - node); + (ProcessorNodeFactory) factory, + (ProcessorNode) node); } else if (factory instanceof SourceNodeFactory) { buildSourceNode(topicSourceMap, repartitionTopics, -(SourceNodeFactory) factory, -(SourceNode) node); +(SourceNodeFactory) factory, +(SourceNode) node); } else if (factory instanceof SinkNodeFactory) { buildSinkNode(processorMap, topicSinkMap, repartitionTopics, - (SinkNodeFactory) factory, - (SinkNode) node); + (SinkNodeFactory) factory, + (SinkNode) node); Review comment: They have subtly different meanings, which I'm not 100% clear on all the time. I'm not sure if I had to change this one, of if it was an accident. I'll give it a closer look. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorShim.java ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; + +public final class ProcessorShim implements Processor { Review comment: I think "adapter" is the standard design pattern name for this type of thing. Not sure why I thought "shim" was a good choice in the heat of the moment. Maybe because I'm kind of slipping these classes in the middle to make everything line up? I can change them to "adapter". ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -880,40 +880,41 @@ public synchronized ProcessorTopology buildGlobalStateTopology() { return globalGroups; } +@SuppressWarnings("unchecked") private ProcessorTopology build(final Set nodeGroup) { Objects.requireNonNull
[GitHub] [kafka] cyrusv closed pull request #9093: Throw error on when keys not found in FileConfigProvider
cyrusv closed pull request #9093: URL: https://github.com/apache/kafka/pull/9093 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
chia7712 commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-664764918 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9096: Add comments to constrainedAssign and generalAssign method
showuon commented on pull request #9096: URL: https://github.com/apache/kafka/pull/9096#issuecomment-665472409 @ableegoldman @vahidhashemian , please help review this PR to add comments to the assign methods. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long
dajac opened a new pull request #9092: URL: https://github.com/apache/kafka/pull/9092 First tests have shown that `controller_mutation_rate` can be quite low (e.g. around 1) in clusters with multiple tenants. At the moment, the rate is defined as a Long which limits the possible low values. Using a Double seems more appropriate. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-665149860 ```streams_standby_replica_test``` -> https://issues.apache.org/jira/browse/KAFKA-10287 I will take a look at ```streams_broker_bounce_test``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
hachikuji commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r461173162 ## File path: clients/src/main/resources/common/message/FetchRequest.json ## @@ -55,35 +55,35 @@ "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, -{ "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false, +{ "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true, Review comment: I guess the implicit expectation is that if the protocol does not support the `read_committed` isolation level, then it wouldn't have transactional data anyway, so reverting to `read_uncommitted` is safe. Can't find a fault with that. ## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ## @@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Str case PRODUCE: return new ProduceRequest(struct, apiVersion); case FETCH: -return new FetchRequest(struct, apiVersion); +return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion); Review comment: nit: any reason not to stick with the same constructor convention as the other requests? ## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.network.ByteBufferSend; +import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.DataOutput; +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +/** + * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer + * of data from a record-set's file channel to the eventual socket channel. + * + * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array + * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written + * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes, + * another Send is passed to the consumer which wraps the underlying record-set's transfer logic. + * + * For example, + * + * + * recordsWritable.writeInt(10); + * recordsWritable.writeRecords(records1); + * recordsWritable.writeInt(20); + * recordsWritable.writeRecords(records2); + * recordsWritable.writeInt(30); + * recordsWritable.flush(); + * + * + * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any + * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is + * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}. + * + * @see org.apache.kafka.common.requests.FetchResponse + */ +public class RecordsWriter implements Writable { +private final String dest; +private final Consumer sendConsumer; +private final ByteBuffer buffer; +private int mark; + +public RecordsWriter(String dest, int totalSize, Consumer sendConsumer) { Review comment: Could we rename `totalSize` so that it is clear that it does not cover the record sizes. Maybe `totalOverheadSize` or `totalNonRecordSize` or something like that. ## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information r
[GitHub] [kafka] abbccdda commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-665189199 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett commented on a change in pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log
bob-barrett commented on a change in pull request #9054: URL: https://github.com/apache/kafka/pull/9054#discussion_r461721737 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -932,6 +927,7 @@ class LogManager(logDirs: Seq[File], val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty) checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) +sourceLog.removeLogMetrics() Review comment: Good catch. Opened https://issues.apache.org/jira/browse/KAFKA-10320 to track it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed [WIP]
guozhangwang commented on pull request #9083: URL: https://github.com/apache/kafka/pull/9083#issuecomment-664699699 cc @ableegoldman @mjsax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
abbccdda commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r462047068 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -274,30 +252,74 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +offset = retryUntilSuccessOrThrowOnTaskTimeout( +() -> globalConsumer.position(topicPartition), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; -while (offset < highWatermark) { -final ConsumerRecords records = globalConsumer.poll(pollTime); +while (offset < highWatermark) { // when we "fix" this loop (KAFKA-7380 / KAFKA-10317) + // we should update the `poll()` timeout below + +// we ignore `poll.ms` config during bootstrapping phase and +// apply `request.timeout.ms` plus `task.timeout.ms` instead +// +// the reason is, that `poll.ms` might be too short to give a fetch request a fair chance +// to actually complete and we don't want to start `task.timeout.ms` too early +// +// we also pass `task.timeout.ms` into `poll()` directly right now as it simplifies our own code: +// if we don't pass it in, we would just track the timeout ourselves and call `poll()` again +// in our own retry loop; by passing the timeout we can reuse the consumer's internal retry loop instead +// +// note that using `request.timeout.ms` provides a conservative upper bound for the timeout; +// this implies that we might start `task.timeout.ms` "delayed" -- however, starting the timeout +// delayed is preferable (as it's more robust) than starting it too early +// +// TODO https://issues.apache.org/jira/browse/KAFKA-10315 +// -> do a more precise timeout handling if `poll` would throw an exception if a fetch request fails +// (instead of letting the consumer retry fetch requests silently) +// +// TODO https://issues.apache.org/jira/browse/KAFKA-10317 and +// https://issues.apache.org/jira/browse/KAFKA-7380 +// -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called +final ConsumerRecords records = globalConsumer.poll(requestTimeoutPlusTaskTimeout); +if (records.isEmpty()) { +// this will always throw +maybeUpdateDeadlineOrThrow(time.milliseconds()); Review comment: Could we just throw here? ## File path: docs/streams/developer-guide/config-streams.html ## @@ -326,13 +321,18 @@ bootstrap.serversstate.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. -60 milliseconds +60 milliseconds (10 minutes) state.dir High Directory location for state stores. /tmp/kafka-streams + task.timeout.ms +Medium Review comment: @mjsax could we do a screenshot to make sure it looks good on the web-page? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -671,19 +1211,21 @@ private void writeCorruptCheckpoint() throws IOException { } } -private void initializeCons
[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
vvcephei commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-665126186 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on pull request #6592: URL: https://github.com/apache/kafka/pull/6592#issuecomment-665078937 Any updates? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sasakitoa commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
sasakitoa commented on a change in pull request #9081: URL: https://github.com/apache/kafka/pull/9081#discussion_r461264833 ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) + killBroker(i) + +val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]().asJava +offsets.put(new TopicPartition(topic1, 0), new OffsetAndMetadata(0)) +try { + producer.sendOffsetsToTransaction(offsets, "test-group") Review comment: Replaced from `mutable.HashMap` to Map ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -687,7 +687,7 @@ public void sendOffsetsToTransaction(Map offs throwIfProducerClosed(); TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata); sender.wakeup(); -result.await(); +result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); Review comment: I wrote some description related to TimeoutException and InterruptedException ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) + killBroker(i) + +val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]().asJava +offsets.put(new TopicPartition(topic1, 0), new OffsetAndMetadata(0)) +try { + producer.sendOffsetsToTransaction(offsets, "test-group") Review comment: Replaced from `mutable.HashMap` to `Map` ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) Review comment: Modified to use from `size` to `indices`, thanks. ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) + killBroker(i) + +try { + producer.sendOffsetsToTransaction(Map( Review comment: I added some timeout tests for initTransaction, commitTransction, abortTransaction using same base method. Is this implementation correct what you intended? ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = cre
[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long
rajinisivaram commented on pull request #9092: URL: https://github.com/apache/kafka/pull/9092#issuecomment-664990134 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #6592: KAFKA-8326: Introduce List Serde
mjsax commented on pull request #6592: URL: https://github.com/apache/kafka/pull/6592#issuecomment-665304810 @yeralin -- your PR is in my review queue. Not sure how quickly I will find time to have a look though atm -- maybe next week, but I can't promise. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
guozhangwang commented on pull request #9081: URL: https://github.com/apache/kafka/pull/9081#issuecomment-664690277 Any blocking APIs should be covered by the `max.block.ms` so I think this is rather a bug-fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r461302625 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Thanks for digging into it @vvcephei -- The question about `pollTimeout` is fair. I guess for default configs it might not matter much, as the default is 100ms IIRC. At the same time, we apply the same `pollTimeout` during regular processing, and as a matter of fact, for this case, a use might want to use a longer poll-timeout, as otherwise, the thread would just "busy wait" anyway (only the responsiveness for a shutdown of the app should be considered). Thus, it might actually make sense to exclude `pollTimeout` completely and only use `requestTimeout + taskTimeout`. Again, using `taskTimeout` in `poll()` reduces the responsiveness of a shutdown -- however, atm during bootstrapping we ignore a shutdown signal anyway, hence, for now we don't make the situation worse. I create a ticket to fix this: https://issues.apache.org/jira/browse/KAFKA-10317 and will add a comment for now. Short related note: actually using `requestTimeout` seems to be a conservative upper bound for poll(). A request could fail with a different error before `requestTimeout` hits and would be retried internally for this case -- if this happens, we might want to start the `taskTimeout` earlier. However, we don't have any means atm to detect this case. Thus, using `requestTimeout` is the best option we have right now (because triggering `taskTimeout` too early seems to be worse than triggering it too late). I created a ticket though that might allow us to improve the code later: https://issues.apache.org/jira/browse/KAFKA-10315 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup
abbccdda opened a new pull request #9095: URL: https://github.com/apache/kafka/pull/9095 In the unit test `shouldDieOnInvalidOffsetExceptionDuringStartup` for JDK 11, we spotted a case where a global stream thread startup would stall if it fails immediately upon the first poll. The reason is that `start()` function only checks whether the thread is *not running*, as it needs to block until it finishes the initialization. However, if the thread transits to `DEAD` immediately, the `start()` call would block forever. Use the failed unit test to verify it works. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pan3793 commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null
pan3793 commented on pull request #8575: URL: https://github.com/apache/kafka/pull/8575#issuecomment-665405076 Fixed the mail thread, do you have any comments on the discussion and KIP doc? @rhauch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9094: KAFKA-10054: add TRACE-level e2e latency metrics
ableegoldman commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r461993688 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java ## @@ -198,7 +198,7 @@ .define(METRICS_RECORDING_LEVEL_CONFIG, Type.STRING, Sensor.RecordingLevel.INFO.toString(), - in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), + in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()), Review comment: It's kind of a bummer that we can't just add the new TRACE level for Streams only; we have to add it to all the clients that Streams passes its configs down to. We could check for the new TRACE level and strip it off before passing the configs on to the clients, but that just seems like asking for trouble. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java ## @@ -88,13 +92,6 @@ private TaskMetrics() {} private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " + "from consumer and not yet processed for this active task"; -private static final String RECORD_E2E_LATENCY = "record-e2e-latency"; Review comment: Moved the common descriptions to StreamsMetricsImpl ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) { return byteEntries; } +private void maybeRecordE2ELatency() { +if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { Review comment: For KV stores, we just compare the current time with the current record's timestamp ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java ## @@ -248,4 +253,12 @@ public void close() { private Bytes keyBytes(final K key) { return Bytes.wrap(serdes.rawKey(key)); } + +private void maybeRecordE2ELatency() { +if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { Review comment: For session and window stores, we also just compare the current time with the current record's timestamp when `put` is called. This can mean the e2e latency is measured several times on the same record, for example in a windowed aggregation. At first I thought that didn't make sense, but now I think it's actually exactly what we want. First of all, it means we can actually account for the latency between calls to `put` within a processor. For simple point inserts this might not be a huge increase on the scale of ms, but more complex processing may benefit from seeing this granularity of information. If they don't want it, well, that's why we introduced `TRACE` Second, while it might seem like we're over-weighting some records by measuring the e2e latency on them more than others, I'm starting to think this actually makes more sense than not: the big picture benefit/use case for the e2e latency metric is less "how long for this record to get sent downstream" and more "how long for this record to be reflected in the state store/IQ results". Given that, each record should be weighted by its actual proportion of the state store. You aren't querying individual records (in a window store), you're querying the windows themselves I toyed around with the idea of measuring the e2e latency relative to the window time, instead of the record timestamp, but ultimately couldn't find any sense in that. Thoughts? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ## @@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String group0100To24, checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, expectedNumberofE2ELatencyMetrics); Review comment: Ok there's something I'm not understanding about this test and/or the built-in metrics version. For some reason, the KV-store metrics are 0 when `METRICS_0100_TO_24` is used, and 1 (as expected) when the latest version in used. I feel like this is wrong, and it should al
[GitHub] [kafka] guozhangwang commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
guozhangwang commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-665190912 Sorry for the long delay @vvcephei , the PR lgtm! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion
junrao commented on pull request #8936: URL: https://github.com/apache/kafka/pull/8936#issuecomment-665148070 I think we agreed to change the logic to do the index sanity check on index opening. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)
Tomasz Bradło created KAFKA-10322: - Summary: InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic) Key: KAFKA-10322 URL: https://issues.apache.org/jira/browse/KAFKA-10322 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.5.0 Environment: windows/linux Reporter: Tomasz Bradło I have regular groupBy&Counting stream configuration: {code:java} fun addStream(kStreamBuilder: StreamsBuilder) { val storeSupplier = Stores.inMemoryWindowStore("count-store", Duration.ofDays(10), Duration.ofDays(1), false) val storeBuilder: StoreBuilder> = Stores .windowStoreBuilder(storeSupplier, JsonSerde(CountableEvent::class.java), Serdes.Long()) kStreamBuilder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .map {_, jsonRepresentation -> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} .groupByKey() .windowedBy(TimeWindows.of(Duration.ofDays(1))) .count(Materialized.with(JsonSerde(CountableEvent::class.java), Serdes.Long())) .toStream() .to("topic1-count") val storeConsumed = Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), Duration.ofDays(1).toMillis()), Serdes.Long()) kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", storeConsumed, passThroughProcessorSupplier) }{code} While sending to "topic1-count", for serializing the key [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] is used which is using [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] so the message key format is: {code:java} real_grouping_key + timestamp(8bytes){code} Everything works. I can get correct values from state-store. But, in recovery scenario, when [GlobalStateManagerImpl |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters offset < highWatermark loop then [InMemoryWindowStore stateRestoreCallback |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads from "topic1-count" and fails to extract valid key and timestamp using [WindowKeySchema.extractStoreKeyBytes |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and [WindowKeySchema.extractStoreTimestamp. |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It fails because it expects format: {code:java} real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code} How this is supposed to work in this case? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6733) Support of printing additional ConsumerRecord fields in DefaultMessageFormatter
[ https://issues.apache.org/jira/browse/KAFKA-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167207#comment-17167207 ] Badai Aqrandista commented on KAFKA-6733: - I did a bad merge on PR 8909. So I've closed it. I created a new PR 9099 that contain the against the latest trunk. This is ready for review: https://github.com/apache/kafka/pull/9099 > Support of printing additional ConsumerRecord fields in > DefaultMessageFormatter > --- > > Key: KAFKA-6733 > URL: https://issues.apache.org/jira/browse/KAFKA-6733 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Mateusz Zakarczemny >Assignee: Badai Aqrandista >Priority: Minor > > It would be useful to have possibility of printing headers, partition and > offset in ConsoleConsumer. Especially support of headers seems to be missing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10323) NullPointerException during rebalance
yazgoo created KAFKA-10323: -- Summary: NullPointerException during rebalance Key: KAFKA-10323 URL: https://issues.apache.org/jira/browse/KAFKA-10323 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.5.0 Reporter: yazgoo *confluent platform version: 5.5.0-ccs* connector used: s3 Connector stops after rebalancing: ERROR [Worker clientId=connect-1, groupId=connect] Couldn't instantiate task because it has an invalid task configuration. This task will not execute until reconfigured. java.lang.NullPointerException at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:427) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1147) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:126) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1162) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1158) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes
[ https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167321#comment-17167321 ] Albert Lowis commented on KAFKA-9273: - Hi [~bbejeck] , [~sujayopensource] I am a newbie looking to contribute, Can I take up this task? Since I see that it has been sometime since the last activity Thank you, Albert > Refactor AbstractJoinIntegrationTest and Sub-classes > > > Key: KAFKA-9273 > URL: https://issues.apache.org/jira/browse/KAFKA-9273 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bill Bejeck >Assignee: Sujay Hegde >Priority: Major > Labels: newbie > > The AbstractJoinIntegrationTest uses an embedded broker, but not all the > sub-classes require the use of an embedded broker anymore. Additionally, > there are two test remaining that require an embedded broker, but they don't > perform joins, the are tests validating other conditions, so ideally those > tests should move into a separate test -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes
[ https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167349#comment-17167349 ] Bill Bejeck commented on KAFKA-9273: Hi [~albert02lowis], Thanks for your interest. It looks like [~sujayopensource] has not started work on this ticket yet. I'd give another day or so to respond, then if you don't hear anything back, feel free to pick this ticket up. I've taken the liberty of adding you to the contributors list, so you should be able to self-assign this ticket and any others in the future. -Bill > Refactor AbstractJoinIntegrationTest and Sub-classes > > > Key: KAFKA-9273 > URL: https://issues.apache.org/jira/browse/KAFKA-9273 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bill Bejeck >Assignee: Sujay Hegde >Priority: Major > Labels: newbie > > The AbstractJoinIntegrationTest uses an embedded broker, but not all the > sub-classes require the use of an embedded broker anymore. Additionally, > there are two test remaining that require an embedded broker, but they don't > perform joins, the are tests validating other conditions, so ideally those > tests should move into a separate test -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
Tommy Becker created KAFKA-10324: Summary: Pre-0.11 consumers can get stuck when messages are downconverted from V2 format Key: KAFKA-10324 URL: https://issues.apache.org/jira/browse/KAFKA-10324 Project: Kafka Issue Type: Bug Reporter: Tommy Becker As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset even if that offset gets removed due to log compaction. If a pre-0.11 consumer seeks to such an offset and issues a fetch, it will get an empty batch, since offsets prior to the requested one are filtered out during down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch offset in this case, but this leaves old consumers unable to consume these topics. The exact behavior varies depending on consumer version. The 0.10.0.0 consumer throws RecordTooLargeException and dies, believing that the record must not have been returned because it was too large. The 0.10.1.0 consumer simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167357#comment-17167357 ] Tommy Becker commented on KAFKA-10324: -- We have some legacy applications whose consumer versions are not easily upgraded hitting this issue, and it's hard to diagnose since the consumers do not give a proper message (or indeed any message in the case of the 0.10.1.0 consumer) and since it is dependent on the way messages are batched, which is opaque to clients. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167369#comment-17167369 ] Ismael Juma commented on KAFKA-10324: - Thanks for the report. Looks like this issue was introduced in 0.11.0.0, would you agree? Interesting that no-one reported it for so long. Out of curiosity, the old consumers are Java consumers or other languages? > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167371#comment-17167371 ] Tommy Becker commented on KAFKA-10324: -- Thanks for the response [~ijuma]. Yes, these are Java consumers, and I agree it's odd that this has not been found before now. I have limited familiarity with the code base, so it's possible I'm missing something but I believe the issue is as I described. In my tests I'm trying to consume a 25GB topic and have found 2 distinct offsets which the consumer cannot advance beyond, and they are both cases where the offset: # Is the lastOffset in the last batch of its log segment # Does not actually exist, presumably due to log compaction. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167387#comment-17167387 ] Jason Gustafson commented on KAFKA-10324: - [~twbecker] Thanks for the report. I'm trying to understand why the fetch is not including batches beyond the one with the last offset removed. Is that because the batch itself is already satisfying the fetch max bytes? It would be helpful to include a snippet from a dump of the log with the batch that is causing problems. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] gwenshap commented on pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log
gwenshap commented on pull request #9054: URL: https://github.com/apache/kafka/pull/9054#issuecomment-665773125 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers
soarez commented on pull request #9000: URL: https://github.com/apache/kafka/pull/9000#issuecomment-665668720 @mjsax what can we do to proceed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista commented on pull request #4807: KAFKA-6733: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter
badaiaqrandista commented on pull request #4807: URL: https://github.com/apache/kafka/pull/4807#issuecomment-665660018 Closing this PR as it has been superseded by PR 9099 (https://github.com/apache/kafka/pull/9099). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method
chia7712 commented on pull request #9096: URL: https://github.com/apache/kafka/pull/9096#issuecomment-665504255 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager
mumrah commented on a change in pull request #9012: URL: https://github.com/apache/kafka/pull/9012#discussion_r462352682 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{LinkedBlockingDeque, TimeUnit} + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import kafka.utils.Logging +import org.apache.kafka.clients._ +import org.apache.kafka.common.requests.AbstractRequest +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.Node +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.security.JaasContext + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * This class manages the connection between a broker and the controller. It runs a single + * {@link BrokerToControllerRequestThread} which uses the broker's metadata cache as its own metadata to find + * and connect to the controller. The channel is async and runs the network connection in the background. + * The maximum number of in-flight requests are set to one to ensure orderly response from the controller, therefore + * care must be taken to not block on outstanding requests for too long. + */ +class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache, + time: Time, + metrics: Metrics, + config: KafkaConfig, + threadNamePrefix: Option[String] = None) extends Logging { + private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem] + private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ") + private val manualMetadataUpdater = new ManualMetadataUpdater() + private val requestThread = newRequestThread + + def start(): Unit = { +requestThread.start() + } + + def shutdown(): Unit = { +requestThread.shutdown() +requestThread.awaitShutdown() + } + + private[server] def newRequestThread = { +val brokerToControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) +val brokerToControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + +val networkClient = { + val channelBuilder = ChannelBuilders.clientChannelBuilder( +brokerToControllerSecurityProtocol, +JaasContext.Type.SERVER, +config, +brokerToControllerListenerName, +config.saslMechanismInterBrokerProtocol, +time, +config.saslInterBrokerHandshakeRequestEnable, +logContext + ) + val selector = new Selector( +NetworkReceive.UNLIMITED, +Selector.NO_IDLE_TIMEOUT_MS, +metrics, +time, +"BrokerToControllerChannel", +Map("BrokerId" -> config.brokerId.toString).asJava, +false, +channelBuilder, +logContext + ) + new NetworkClient( +selector, +manualMetadataUpdater, +config.brokerId.toString, +1, +0, +0, +Selectable.USE_DEFAULT_BUFFER_SIZE, +Selectable.USE_DEFAULT_BUFFER_SIZE, +config.requestTimeoutMs, +config.connectionSetupTimeoutMs, +config.connectionSetupTimeoutMaxMs, +ClientDnsLookup.USE_ALL_DNS_IPS, +time, +false, +new ApiVersions, +logContext + ) +} +val threadName = threadNamePrefix match { + case None => s"broker-${config.brokerId}-to-controller-send-thread" + case Some(name) => s"$name:broker-${config.brokerId}-to-controller-send-thread" +} + +new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, requestQueue, metadataCache, config, + brokerToControllerListenerName, time, threadName) + } + + private[server] def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest]
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r462364719 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -366,225 +255,128 @@ public FetchResponse(Errors error, LinkedHashMap> responseData, int throttleTimeMs, int sessionId) { -this.error = error; -this.responseData = responseData; -this.throttleTimeMs = throttleTimeMs; -this.sessionId = sessionId; +this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); +this.responseDataMap = responseData; } -public static FetchResponse parse(Struct struct) { -LinkedHashMap> responseData = new LinkedHashMap<>(); -for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { -Struct topicResponse = (Struct) topicResponseObj; -String topic = topicResponse.get(TOPIC_NAME); -for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { -Struct partitionResponse = (Struct) partitionResponseObj; -Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME); -int partition = partitionResponseHeader.get(PARTITION_ID); -Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE)); -long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK); -long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET); -long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET); -Optional preferredReadReplica = Optional.of( -partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID) - ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate()); - -BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME); -if (!(baseRecords instanceof MemoryRecords)) -throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass()); -MemoryRecords records = (MemoryRecords) baseRecords; - -List abortedTransactions = null; -if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) { -Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME); -if (abortedTransactionsArray != null) { -abortedTransactions = new ArrayList<>(abortedTransactionsArray.length); -for (Object abortedTransactionObj : abortedTransactionsArray) { -Struct abortedTransactionStruct = (Struct) abortedTransactionObj; -long producerId = abortedTransactionStruct.get(PRODUCER_ID); -long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET); -abortedTransactions.add(new AbortedTransaction(producerId, firstOffset)); -} -} -} - -PartitionData partitionData = new PartitionData<>(error, highWatermark, lastStableOffset, -logStartOffset, preferredReadReplica, abortedTransactions, records); -responseData.put(new TopicPartition(topic, partition), partitionData); -} -} -return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData, -struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID)); +public FetchResponse(FetchResponseData fetchResponseData) { +this.data = fetchResponseData; +this.responseDataMap = toResponseDataMap(fetchResponseData); } @Override public Struct toStruct(short version) { -return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); +return data.toStruct(version); } @Override -protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { -Struct responseHeaderStruct = responseHeader.toStruct(); -Struct responseBodyStruct = toStruct(apiVersion); - -// write the total size and the response header -ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4); -buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf()); -responseHeaderStruct.writeTo(buffer); +public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { +// Generate the Sends for
[GitHub] [kafka] abbccdda commented on pull request #9012: KAFKA-10270: A broker to controller channel manager
abbccdda commented on pull request #9012: URL: https://github.com/apache/kafka/pull/9012#issuecomment-665757647 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness
guozhangwang commented on pull request #9087: URL: https://github.com/apache/kafka/pull/9087#issuecomment-665779451 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista opened a new pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
badaiaqrandista opened a new pull request #9099: URL: https://github.com/apache/kafka/pull/9099 Implementation of KIP-431 - Support of printing additional ConsumerRecord fields in DefaultMessageFormatter https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
cadonna commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r462327050 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) { return byteEntries; } +private void maybeRecordE2ELatency() { +if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { Review comment: I think, you do not need to check for metrics with `e2eLatencySensor.hasMetrics()`. There should always be metrics within this sensor. `hasMetrics()` is used in `StreamsMetricsImpl#maybeMeasureLatency()` because some sensors may not contain any metrics due to the built-in metrics version. For instance, the destroy sensor exists for built-in metrics version 0.10.0-2.4 but not for latest. To avoid version checks in the record processing code, we just create an empty sensor and call record on it effectively not recording any metrics for this sensor for version latest. We do not hide newly added metrics if the built-in version is set to an older version. Same applies to the other uses of `hasMetrics()` introduced in this PR. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java ## @@ -248,4 +253,12 @@ public void close() { private Bytes keyBytes(final K key) { return Bytes.wrap(serdes.rawKey(key)); } + +private void maybeRecordE2ELatency() { +if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { Review comment: Your approach makes sense to me. I agree that the latency should refer to the update in the state store and not to record itself. If a record updates the state more than once then latency should be measured each time. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java ## @@ -443,6 +447,25 @@ public static Sensor suppressionBufferSizeSensor(final String threadId, ); } +public static Sensor e2ELatencySensor(final String threadId, + final String taskId, + final String storeType, + final String storeName, + final StreamsMetricsImpl streamsMetrics) { +final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE); +final Map tagMap = streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName); +addAvgAndMinAndMaxToSensor( +sensor, +STATE_STORE_LEVEL_GROUP, Review comment: You need to use the `stateStoreLevelGroup()` here instead of `STATE_STORE_LEVEL_GROUP` because the group name depends on the version and the store type. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ## @@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String group0100To24, checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, expectedNumberofE2ELatencyMetrics); Review comment: I agree with you, it should always be 1. It is the group of the metrics. See my comment in `StateStoreMetrics`. I am glad this test served its purpose, because I did not notice this in the unit tests! ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -468,38 +468,44 @@ public void shouldRecordE2ELatencyOnProcessForSourceNodes() { } @Test -public void shouldRecordE2ELatencyMinAndMax() { +public void shouldRecordE2ELatencyAvgAndMinAndMax() { time = new MockTime(0L, 0L, 0L); metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time); task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); final String sourceNode = source1.name(); -final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST); +final Metric avgMetric = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST); final Metric minMetric = getProcessorMetric("record-e2e-latency", "%s-min", ta
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167400#comment-17167400 ] Tommy Becker commented on KAFKA-10324: -- [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] ### End of segment is here > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-665795947 I ran a console consumer perf test (at @hachikuji's suggestion) and took a profile.  Zoomed in a bit on the records part:  This was with only a handful of partitions on a single broker (on my laptop), but it confirms that the new FetchResponse serialization is hitting the same sendfile path as the previous code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167400#comment-17167400 ] Tommy Becker edited comment on KAFKA-10324 at 7/29/20, 5:46 PM: [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] End of segment is here was (Author: twbecker): [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] ### End of segment is here > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording
cadonna commented on pull request #9098: URL: https://github.com/apache/kafka/pull/9098#issuecomment-665616943 Call for review: @vvcephei @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
dajac commented on a change in pull request #9099: URL: https://github.com/apache/kafka/pull/9099#discussion_r462306541 ## File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala ## @@ -459,48 +466,32 @@ class DefaultMessageFormatter extends MessageFormatter { var printKey = false var printValue = true var printPartition = false - var keySeparator = "\t".getBytes(StandardCharsets.UTF_8) - var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8) + var printOffset = false + var printHeaders = false + var keySeparator = utfBytes("\t") + var lineSeparator = utfBytes("\n") + var headersSeparator = utfBytes(",") + var nullLiteral = utfBytes("null") var keyDeserializer: Option[Deserializer[_]] = None var valueDeserializer: Option[Deserializer[_]] = None - - override def configure(configs: Map[String, _]): Unit = { -val props = new java.util.Properties() -configs.asScala.foreach { case (key, value) => props.put(key, value.toString) } -if (props.containsKey("print.timestamp")) - printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true") -if (props.containsKey("print.key")) - printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true") -if (props.containsKey("print.value")) - printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true") -if (props.containsKey("print.partition")) - printPartition = props.getProperty("print.partition").trim.equalsIgnoreCase("true") -if (props.containsKey("key.separator")) - keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8) -if (props.containsKey("line.separator")) - lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8) -// Note that `toString` will be called on the instance returned by `Deserializer.deserialize` -if (props.containsKey("key.deserializer")) { - keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor() -.newInstance().asInstanceOf[Deserializer[_]]) - keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true) -} -// Note that `toString` will be called on the instance returned by `Deserializer.deserialize` -if (props.containsKey("value.deserializer")) { - valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor() -.newInstance().asInstanceOf[Deserializer[_]]) - valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false) -} - } - - private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = { -val newProps = new Properties() -props.asScala.foreach { case (key, value) => - if (key.startsWith(prefix) && key.length > prefix.length) -newProps.put(key.substring(prefix.length), value) -} -newProps + var headersDeserializer: Option[Deserializer[_]] = None + + override def init(props: Properties): Unit = { Review comment: `init(props: Properties)` has been deprecated. It would be great if we could keep using `configure(configs: Map[String, _])` as before. I think that we should also try to directly extract the values from the `Map` instead of using a `Properties`. ## File path: core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala ## @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.tools + +import java.io.{ByteArrayOutputStream, Closeable, PrintStream} +import java.nio.charset.StandardCharsets +import java.util +import java.util.Properties + +import kafka.tools.DefaultMessageFormatter +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} +import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.serialization.Deserializer +import org.junit.Assert._ +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.run