[GitHub] [kafka] vvcephei merged pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-29 Thread GitBox


vvcephei merged pull request #9066:
URL: https://github.com/apache/kafka/pull/9066


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9084: MINOR: Preserve Kafka exception from RebalanceListener [Do Not Merge]

2020-07-29 Thread GitBox


guozhangwang commented on pull request #9084:
URL: https://github.com/apache/kafka/pull/9084#issuecomment-664698317


   @abbccdda The exception is thrown from the consumer.poll, and the caller of 
the consumer could be expecting specific exceptions, e.g. TaskMigratedException 
will be handled specifically in Streams, but a general KafkaException would be 
treated as a fatal error.
   
   We're already doing this e.g. in ConsumerCoordinator line 432, but it is not 
done in all occasions, so I'm just trying to make the behavior to be consistent 
across all callers here.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)

2020-07-29 Thread GitBox


dajac commented on a change in pull request #9072:
URL: https://github.com/apache/kafka/pull/9072#discussion_r461123154



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket extends SampledStat {

Review comment:
   Note to myself: Add javadoc.

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket extends SampledStat {
+
+private final TimeUnit unit;
+
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+public TokenBucket(TimeUnit unit) {

Review comment:
   That is correct. This is bit unfortunate but as the two are independent 
from each others, we can't enforce using the same timeunit nor reuse the time 
unit of the rate.
   
   I will make this clear in the javadoc.

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket extends SampledStat {
+
+private final TimeUnit unit;
+
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+public TokenBucket(TimeUnit unit) {
+super(0);
+this.unit = unit;
+}
+
+@Override
+public void record(MetricConfig config, double value, long timeMs) {
+if (value < 0) {
+unrecord(config, -value, timeMs);
+return;
+}
+
+final double quota = quota(config);
+final long firstTimeWindowMs = firstTimeWindowMs(config, timeMs);
+
+// Get current sample or create one if empty
+Sample sample = current(firstTimeWindowMs);
+
+// Verify that the current sample was not reinitialized. If it was the 
case,
+// restart from the first time window

[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-29 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r461266795



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
 static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;

Review comment:
   Yeah my major concern is to tie the flushing policy with rocksdb -- 
although it is the default persistent stores now, we should avoid tying with a 
specific type of stores.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -454,6 +456,41 @@ public void flush() {
 }
 }
 
+public void flushCache() {
+RuntimeException firstException = null;
+// attempting to flush the stores
+if (!stores.isEmpty()) {
+log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+for (final StateStoreMetadata metadata : stores.values()) {
+final StateStore store = metadata.stateStore;
+
+try {
+// buffer should be flushed to send all records to 
changelog
+if (store instanceof TimeOrderedKeyValueBuffer) {

Review comment:
   I'm thinking we can remove the whole `flushCache` method.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
 static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
 private StateManagerUtil() {}
 
 static RecordConverter converterForStore(final StateStore store) {
 return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
 }
 
+static boolean checkpointNeeded(final boolean enforceCheckpoint,
+final Map 
oldOffsetSnapshot,
+final Map 
newOffsetSnapshot) {
+// we should always have the old snapshot post completing the register 
state stores;
+// if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+if (oldOffsetSnapshot == null)
+return false;
+
+// if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+// note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+// and hence it's okay to not checkpoint
+if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+return true;
+
+// we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+long totalOffsetDelta = 0L;
+for (final Map.Entry entry : 
newOffsetSnapshot.entrySet()) {
+totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+}
+
+// when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+// otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
   I think we do not need to enforce checkpoint during suspension but only 
need to do that during closure / recycling; if a suspended task is resumed then 
we do not need to write checkpoint in between.
   
   But admittedly moving forward most suspended tasks would be closed or 
recycled :slightly_smiling_face: So I can change that back.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgroothuijsen commented on pull request #9089: KAFKA-10224: Update jersey license from CDDL to EPLv2

2020-07-29 Thread GitBox


rgroothuijsen commented on pull request #9089:
URL: https://github.com/apache/kafka/pull/9089#issuecomment-665171345


   @rhauch No problem, thanks for the help! I used the version from the Eclipse 
home page, but this one has much nicer formatting as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgroothuijsen opened a new pull request #9089: KAFKA-10224: Update jersey license from CDDL to EPLv2

2020-07-29 Thread GitBox


rgroothuijsen opened a new pull request #9089:
URL: https://github.com/apache/kafka/pull/9089


   Jersey has changed its license from CDDL to EPLv2 starting from version 
2.28. This PR updates the included license information to reflect this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffkbkim commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-29 Thread GitBox


jeffkbkim commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r461742057



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1986,101 +1965,125 @@ private[controller] class ControllerStats extends 
KafkaMetricsGroup {
 
 sealed trait ControllerEvent {
   def state: ControllerState
+  def preempt(): Unit
 }
 
 case object ControllerChange extends ControllerEvent {
-  override def state = ControllerState.ControllerChange
+  override def state: ControllerState = ControllerState.ControllerChange
+  override def preempt(): Unit = {}

Review comment:
   @mumrah one of the issues was that people are able to create controller 
events without implementing preemption. In most cases, this is fine because 
most events don't have callbacks. But for events that do have callbacks, 
someone can forget to implement preemption and this won't be caught during 
compile time. This is to force an implementation of preemption which can be 
empty when designing a new event.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2090,6 +2093,12 @@ case class ReplicaLeaderElection(
   callback: ElectLeadersCallback = _ => {}
 ) extends ControllerEvent {
   override def state: ControllerState = ControllerState.ManualLeaderBalance
+
+  override def preempt(): Unit = callback(
+partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, 
Either[ApiError, Int]]) { partitions =>

Review comment:
   @mumrah i believe the initial `Map.empty[TopicPartition, 
Either[ApiError, Int]]` was used to structure the return type 
(`ElectLeadersCallback: Map[TopicPartition, Either[ApiError, Int]] => Unit`) 
which cannot be achieved using `foreach`. 

##
File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala
##
@@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int,
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[QueuedEvent]
   // Visible for test
-  private[controller] val thread = new 
ControllerEventThread(ControllerEventThreadName)
+  private[controller] var thread = new 
ControllerEventThread(ControllerEventThreadName)

Review comment:
   @jsancio updated the PR. i wrapped the entire method in a lock because 
the return type `QueuedEvent` can be retrieved by `put(event)`. the suggested 
code would require another variable to be initialized, then set inside the 
lock, then returned at the end. I thought it would be best to prefer simplicity 
and readability over performance as this only occurs in the start and end of 
`ControllerEventManager`'s lifecycle.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-29 Thread GitBox


guozhangwang commented on pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#issuecomment-664688329


   @ableegoldman @vvcephei I've thought about controlling the frequency of 
flushing, and I've decided to expose another public config to let users 
specify. But I'd defer that to another PR. I'd like to do the following:
   
   1) Merge this PR based on a hard-coded num.records difference.
   2) Run benchmarks to understand its implications on stateful applications 
with / wo EOS.
   3) Then file a KIP that introduce a new config value, also potentially 
change the default commit interval with EOS.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-29 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-664652669







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgibaiev edited a comment on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null

2020-07-29 Thread GitBox


rgibaiev edited a comment on pull request #8575:
URL: https://github.com/apache/kafka/pull/8575#issuecomment-665225469


   Any update? @rhauch 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness

2020-07-29 Thread GitBox


guozhangwang commented on pull request #9087:
URL: https://github.com/apache/kafka/pull/9087#issuecomment-665171108


   For correctness I agree; though for some tests we cover the rebalance 
triggered by member dropping out of the group and hence reducing the session 
timeout also largely reduces the overall run time of the test case, so I'd say 
for testing runtime we may still want to check case-by-case.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-29 Thread GitBox


rhauch commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-665065731







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)

2020-07-29 Thread GitBox


apovzner commented on a change in pull request #9072:
URL: https://github.com/apache/kafka/pull/9072#discussion_r461297706



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket extends SampledStat {
+
+private final TimeUnit unit;
+
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+public TokenBucket(TimeUnit unit) {

Review comment:
   Would be useful to add javadoc here for `unit` param. I understand, the 
unit needs to match Quota representation in `config` param passed to `record()` 
method, right?

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket extends SampledStat {
+
+private final TimeUnit unit;
+
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+public TokenBucket(TimeUnit unit) {
+super(0);
+this.unit = unit;
+}
+
+@Override
+public void record(MetricConfig config, double value, long timeMs) {
+if (value < 0) {
+unrecord(config, -value, timeMs);
+return;
+}
+
+final double quota = quota(config);
+final long firstTimeWindowMs = firstTimeWindowMs(config, timeMs);
+
+// Get current sample or create one if empty
+Sample sample = current(firstTimeWindowMs);
+
+// Verify that the current sample was not reinitialized. If it was the 
case,
+// restart from the first time window
+if (sample.eventCount == 0) {
+sample.reset(firstTimeWindowMs);
+}
+
+// Add the value to the current sample
+sample.value += value;
+sample.eventCount += 1;
+
+// If current sample is completed AND a new one can be created,
+// create one and spill over the amount above the quota to the
+// new sample. Repeat until either the sample is not complete
+// or no new sample can be created.
+while (sample.isComplete(timeMs, config)) {
+double extra = sample.value - quota;
+sample.value = quota;
+
+sample = advance(config, sample.lastWindowMs + 
config.timeWindowMs());
+sample.value = extra;
+sample.eventCount += 1;
+}
+}
+
+private void unrecord(MetricConfig config, double value, long timeMs) {
+final double quota = quota(config);
+final long firstTimeWindowMs = firstTimeWindowMs(config, timeMs);
+
+// Rewind
+while (value > 0) {
+// Get current sample or create one if empty
+Sample sample = current(firstTimeWindowMs);
+
+// If the current sample has been purged, we can't unrecord 
anything
+if (sample.eventCount == 0) {
+return;
+

[GitHub] [kafka] huxihx commented on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once

2020-07-29 Thread GitBox


huxihx commented on pull request #8984:
URL: https://github.com/apache/kafka/pull/8984#issuecomment-664804260


   Ping @omkreddy @mimaison for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null

2020-07-29 Thread GitBox


rhauch commented on pull request #8575:
URL: https://github.com/apache/kafka/pull/8575#issuecomment-665303585


   This PR has a conflict that has to be resolved first.
   
   Also, KIP-581 must be passed before this can be merged. See the discussion 
thread for the KIP.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang edited a comment on pull request #9011: KAFKA-10134: Still depends on existence of any fetchable partitions to block on join

2020-07-29 Thread GitBox


guozhangwang edited a comment on pull request #9011:
URL: https://github.com/apache/kafka/pull/9011#issuecomment-665182983


   @hachikuji That's what I was wondering as well. From the logs we have: 
https://issues.apache.org/jira/secure/attachment/13008127/consumer5.log.2020-07-22.log
   
   If you search for `Returning timer remaining` you can find that in most time 
it is normal, and during that re-joining it is indeed returning 0 but that's 
expected since we could not connect to the broker during that period of time. 
So there's no surprising `timeToNextHeartbeat` returning 0 from that run.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-29 Thread GitBox


mumrah commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r461709093



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1986,101 +1965,125 @@ private[controller] class ControllerStats extends 
KafkaMetricsGroup {
 
 sealed trait ControllerEvent {
   def state: ControllerState
+  def preempt(): Unit
 }
 
 case object ControllerChange extends ControllerEvent {
-  override def state = ControllerState.ControllerChange
+  override def state: ControllerState = ControllerState.ControllerChange
+  override def preempt(): Unit = {}

Review comment:
   Can we define a default implementation on the trait rather than 
overriding all these with empty functions?

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2090,6 +2093,12 @@ case class ReplicaLeaderElection(
   callback: ElectLeadersCallback = _ => {}
 ) extends ControllerEvent {
   override def state: ControllerState = ControllerState.ManualLeaderBalance
+
+  override def preempt(): Unit = callback(
+partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, 
Either[ApiError, Int]]) { partitions =>

Review comment:
   nit: I think you can use `foreach` instead of fold + default value





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #9091: MINOR; Make KafkaAdminClientTest.testDescribeLogDirsPartialFailure and KafkaAdminClientTest.testAlterReplicaLogDirsPartialFailure test more reli

2020-07-29 Thread GitBox


dajac opened a new pull request #9091:
URL: https://github.com/apache/kafka/pull/9091


   I have seem them failing locally from times to times. It seems that the 
changes made in https://github.com/apache/kafka/pull/8864 have made them more 
fragile. I have updated them to be more reliable.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9096: Add comments to constrainedAssign and generalAssign method

2020-07-29 Thread GitBox


showuon opened a new pull request #9096:
URL: https://github.com/apache/kafka/pull/9096


   Recently, I tried to read the codes to understand what the 
`constrainedAssign` and `generalAssign` method is doing, but it is so difficult 
and suffering due to the complexity of the algorithm. And then I traced back to 
the JIRA ticket and KIP to get much more information about them. So, I think we 
should put the algorithm goal and main steps in the code comments, to let other 
developers better understand them and better do trouble shooting if any. 
   
   **reference for `constrainedAssign` algorithm:**
   
https://issues.apache.org/jira/browse/KAFKA-9987?focusedCommentId=17106832&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17106832
   
   **reference for `generalAssign` algorithm:**
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-07-29 Thread GitBox


viktorsomogyi commented on pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#issuecomment-664920356


   Also the jdk11&scala2.13 build timed out due to another test:
   ```
   12:49:33 org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest 
> shouldDieOnInvalidOffsetExceptionDuringStartup STARTED
   16:47:21 Build timed out (after 270 minutes). Marking the build as aborted.
   16:47:21 Build was aborted
   16:47:22 [FINDBUGS] Skipping publisher since build result is ABORTED
   16:47:22 Recording test results
   16:47:22 Setting MAVEN_LATEST__HOME=/home/jenkins/tools/maven/latest/
   16:47:22 Setting GRADLE_4_10_3_HOME=/home/jenkins/tools/gradle/4.10.3
   16:47:22 
   16:47:22 org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest 
> shouldDieOnInvalidOffsetExceptionDuringStartup SKIPPED
   ```
   The test caused the build to get stuck and time out. Based on a successful 
local run this is also flaky.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9011: KAFKA-10134: Still depends on existence of any fetchable partitions to block on join

2020-07-29 Thread GitBox


guozhangwang commented on pull request #9011:
URL: https://github.com/apache/kafka/pull/9011#issuecomment-665182983


   That's what I was wondering as well. From the logs we have: 
https://issues.apache.org/jira/secure/attachment/13008127/consumer5.log.2020-07-22.log
   
   If you search for `Returning timer remaining` you can find that in most time 
it is normal, and during that re-joining it is indeed returning 0 but that's 
expected since we could not connect to the broker during that period of time.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch merged pull request #9089: KAFKA-10224: Update jersey license from CDDL to EPLv2

2020-07-29 Thread GitBox


rhauch merged pull request #9089:
URL: https://github.com/apache/kafka/pull/9089


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs

2020-07-29 Thread GitBox


tombentley commented on pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#issuecomment-664832499







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

2020-07-29 Thread GitBox


tombentley commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-664836543


   @mimaison done, thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-29 Thread GitBox


ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r461229173



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
 static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;

Review comment:
   Something like that. We know rocksdb will render the memtable immutable 
once it reaches the configured memtable size, after that it will flush once the 
number of immutable memtables reaches the configured value. Probably makes 
sense to align our checkpoint/flushing to the configured rocksdb flushing.
   
   Would be cool if we could piggy-back on the rocksdb options and avoid a new 
config in Streams altogether, but obviously not everyone uses rocksdb

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -454,6 +456,41 @@ public void flush() {
 }
 }
 
+public void flushCache() {
+RuntimeException firstException = null;
+// attempting to flush the stores
+if (!stores.isEmpty()) {
+log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+for (final StateStoreMetadata metadata : stores.values()) {
+final StateStore store = metadata.stateStore;
+
+try {
+// buffer should be flushed to send all records to 
changelog
+if (store instanceof TimeOrderedKeyValueBuffer) {

Review comment:
   So you're saying we'd still need to flush the suppression buffer but not 
the cache once we decouple caching from emitting? Or that we can remove this 
`flushCache` method altogether once that is done? Or that it will still do some 
flushing, but will not resemble the current `flushCache` method at all

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -454,6 +456,41 @@ public void flush() {
 }
 }
 
+public void flushCache() {
+RuntimeException firstException = null;
+// attempting to flush the stores
+if (!stores.isEmpty()) {
+log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+for (final StateStoreMetadata metadata : stores.values()) {
+final StateStore store = metadata.stateStore;
+
+try {
+// buffer should be flushed to send all records to 
changelog
+if (store instanceof TimeOrderedKeyValueBuffer) {

Review comment:
   Is there a ticket for that?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
 static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
 private StateManagerUtil() {}
 
 static RecordConverter converterForStore(final StateStore store) {
 return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
 }
 
+static boolean checkpointNeeded(final boolean enforceCheckpoint,
+final Map 
oldOffsetSnapshot,
+final Map 
newOffsetSnapshot) {
+// we should always have the old snapshot post completing the register 
state stores;
+// if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+if (oldOffsetSnapshot == null)
+return false;
+
+// if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+// note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+// and hence it's okay to not checkpoint
+if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+return true;
+
+// we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+long totalOffsetDelta = 0L;
+for (final Map.Entry entry : 
newOffsetSnapshot.entrySet()) {
+totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+}
+
+// when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+// otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
   I'm still not following -- I thought we no longer commit during c

[GitHub] [kafka] dajac commented on pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)

2020-07-29 Thread GitBox


dajac commented on pull request #9072:
URL: https://github.com/apache/kafka/pull/9072#issuecomment-664923691


   @apovzner Thanks for your comments. I have updated the PR to incorporate 
your feedback. I have also fixed a bug in unrecord and added the javadoc.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cyrusv opened a new pull request #9093: Throw error on when keys not found in FileConfigProvider

2020-07-29 Thread GitBox


cyrusv opened a new pull request #9093:
URL: https://github.com/apache/kafka/pull/9093


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sasakitoa commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-29 Thread GitBox


sasakitoa commented on pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#issuecomment-664725165


   Thank you for kindness comments. I updated to improve java doc and related 
test code.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx edited a comment on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once

2020-07-29 Thread GitBox


huxihx edited a comment on pull request #8984:
URL: https://github.com/apache/kafka/pull/8984#issuecomment-664804260


   Ping @omkreddy @mimaison @dajac  for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9091: MINOR; Make KafkaAdminClientTest.testDescribeLogDirsPartialFailure and KafkaAdminClientTest.testAlterReplicaLogDirsPartialFailure test more reli

2020-07-29 Thread GitBox


dajac commented on pull request #9091:
URL: https://github.com/apache/kafka/pull/9091#issuecomment-664848664


   @chia7712 Ack. I reverted `testMetadataRetries` back to the original version 
prior to #8864. That test is specific for the retries so it doesn't make sense 
to remove the retries config.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9086: FIX: Remove staticmethod tag to be able to use logger of instance

2020-07-29 Thread GitBox


mjsax commented on pull request #9086:
URL: https://github.com/apache/kafka/pull/9086#issuecomment-664684265


   Merged to `trunk` and cherry-picked to `2.6`, `2.5`, `2.4`, and `2.3` 
branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


cmccabe commented on a change in pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#discussion_r461884961



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.network.RequestChannel
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.AbstractRequest.NoOpRequestBuilder
+import org.apache.kafka.common.security.JaasContext
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class manages the connection between a broker and the controller. It 
runs a single
+ * {@link BrokerToControllerRequestThread} which uses the broker's metadata 
cache as its own metadata to find
+ * and connect to the controller. The channel is async and runs the network 
connection in the background.
+ * The maximum number of in-flight requests are set to one to ensure orderly 
response from the controller, therefore
+ * care must be taken to not block on outstanding requests for too long.
+ */
+class BrokerToControllerChannelManager(metadataCache: 
kafka.server.MetadataCache,
+   time: Time,
+   metrics: Metrics,
+   config: KafkaConfig,
+   threadNamePrefix: Option[String] = 
None) extends Logging {
+  private val requestQueue = new 
LinkedBlockingQueue[BrokerToControllerQueueItem]
+  private val logContext = new 
LogContext(s"[broker-${config.brokerId}-to-controller] ")
+  private val manualMetadataUpdater = new ManualMetadataUpdater()
+  private val requestThread = newRequestThread
+
+  def start(): Unit = {
+requestThread.start()
+  }
+
+  def shutdown(): Unit = {
+requestThread.shutdown()
+requestThread.awaitShutdown()
+  }
+
+  private[server] def newRequestThread = {
+val brokerToControllerListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+val brokerToControllerSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+
+val networkClient = {
+  val channelBuilder = ChannelBuilders.clientChannelBuilder(
+brokerToControllerSecurityProtocol,
+JaasContext.Type.SERVER,
+config,
+brokerToControllerListenerName,
+config.saslMechanismInterBrokerProtocol,
+time,
+config.saslInterBrokerHandshakeRequestEnable,
+logContext
+  )
+  val selector = new Selector(
+NetworkReceive.UNLIMITED,
+Selector.NO_IDLE_TIMEOUT_MS,
+metrics,
+time,
+"BrokerToControllerChannel",
+Map("BrokerId" -> config.brokerId.toString).asJava,
+false,
+channelBuilder,
+logContext
+  )
+  new NetworkClient(
+selector,
+manualMetadataUpdater,
+config.brokerId.toString,
+1,
+0,
+0,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+config.requestTimeoutMs,
+config.connectionSetupTimeoutMs,
+config.connectionSetupTimeoutMaxMs,
+ClientDnsLookup.DEFAULT,

Review comment:
   @abbccdda : Please switch this to `ClientDnsLookup.USE_ALL_DNS_IPS` to 
be consistent with the other NetworkClients, such as the one in 
`ControllerChannelManager`, etc.

##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -168,6 +168,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
   var kafkaController: KafkaController = null
 
+  var brokerToControllerChann

[GitHub] [kafka] rhauch commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-29 Thread GitBox


rhauch commented on pull request #9051:
URL: https://github.com/apache/kafka/pull/9051#issuecomment-665124035


   This was also cherry-picked to `2.6`, but that branch has been frozen while 
we try to release AK 2.6.0. However, given that this is low-risk, I'll leave it 
on `2.6` and update 
[KAFKA-10268](https://issues.apache.org/jira/browse/KAFKA-10268) instead.
   
   @huxihx, next time please refrain from cherry-picking to frozen branches. 
Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-29 Thread GitBox


ableegoldman commented on pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#issuecomment-665394762


   call for review @vvcephei @cadonna @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #9089: KAFKA-10224: Update jersey license from CDDL to EPLv2

2020-07-29 Thread GitBox


rhauch commented on pull request #9089:
URL: https://github.com/apache/kafka/pull/9089#issuecomment-665168618







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] codelabor opened a new pull request #9090: toString pattern change like ProducerRecord.java

2020-07-29 Thread GitBox


codelabor opened a new pull request #9090:
URL: https://github.com/apache/kafka/pull/9090


   
   In most of 'toString()' methods,'=' is used instead of ' = ', so I followed 
the convention.
   Actually, if you print 'ProducerRecord' and 'ConsumerRecord', the pattern 
looks different and looks strange.
   -
   producerRecord: ProducerRecord(topic=, partition=null, headers= ...)
   consumerRecord: ConsumerRecord(topic = , partition = 0, ..., headers = 
...)
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-29 Thread GitBox


hachikuji commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r461114945



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -292,21 +278,36 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
 while (offset < highWatermark) {
 try {
-final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+final ConsumerRecords records =
+
globalConsumer.poll(pollTimePlusRequestTimeoutPlusTaskTimeout);

Review comment:
   It does seem a bit weird here to add in the request timeout. Not sure I 
follow the reasoning behind that...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgibaiev commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null

2020-07-29 Thread GitBox


rgibaiev commented on pull request #8575:
URL: https://github.com/apache/kafka/pull/8575#issuecomment-665225469


   Any update?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-07-29 Thread GitBox


showuon commented on pull request #9062:
URL: https://github.com/apache/kafka/pull/9062#issuecomment-664801932


   hi @feyman2016  @huxihx , could you help review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long

2020-07-29 Thread GitBox


dajac commented on pull request #9092:
URL: https://github.com/apache/kafka/pull/9092#issuecomment-664996150


   @rajinisivaram I have updated an existing test to use 1.5 instead of 2 to 
verify this. 
https://github.com/apache/kafka/pull/9092/files#diff-296d7b93103356535b8b891f019e4651R113.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims

2020-07-29 Thread GitBox


vvcephei commented on pull request #9004:
URL: https://github.com/apache/kafka/pull/9004#issuecomment-664740837


   Thanks for the review, @abbccdda ! I've addressed your feedback.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-29 Thread GitBox


vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r461143961



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   Ok, thanks @mjsax . I just traced though the consumer code again, and 
have finally been able to see what you already knew: that `request.timeout.ms` 
is indeed the correct amount of time to wait.
   
   Namely, we send a fetch here: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1285
   
   Which calls through to client.send here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L263
   
   Which fills in the `request.timeout.ms` config value here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L106
   
   Which uses it to construct a ClientRequest here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L129-L130
   
   Which then gets used to create an InFlightRequest when it gets sent here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1248
   
   Which is later used to detect expired requests here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L162
   
   Which is used to list nodes (brokers) for which there is an expired request 
here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L179
   
   Which is then processed as a "disconnection" here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L803
   
   It also looks like the KafkaClient just does a tight-loop checking for a 
network response, so we don't really need any extra time to account for 
sampling errors. Also, it still seems like using the sum as the poll duration 
is just as good as using your retry logic, so I think the duration parameter is 
fine.
   
   My only remaining question, which maybe doesn't really matter one way or 
another, is whether `poll.ms` really belongs here or not. It seems like the 
desired semantics are accomplished by just waiting `request.timeout.ms` for the 
initial failure, and then an extra `task.timeout.ms` for any retries.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -292,21 +278,36 @@

[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-29 Thread GitBox


chia7712 edited a comment on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-665149860


   ```streams_standby_replica_test```  -> 
https://issues.apache.org/jira/browse/KAFKA-10287
   
   I will take a look at ```streams_broker_bounce_test``` 
(https://issues.apache.org/jira/browse/KAFKA-10292)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness

2020-07-29 Thread GitBox


cadonna commented on pull request #9087:
URL: https://github.com/apache/kafka/pull/9087#issuecomment-664621717


   @abbccdda Looking at this test class again, I even think that we could set 
session timeout and heartbeat interval to default for all tests. The main 
motivation to reduce them was to not need to change the application ID for each 
test (see https://github.com/apache/kafka/pull/8530#discussion_r413170584). 
However, in the meanwhile each test has its own application ID, so I guess it 
would be fine to use again the defaults here. Is this correct @guozhangwang ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


abbccdda commented on pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#issuecomment-665410633







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with adapters

2020-07-29 Thread GitBox


abbccdda commented on a change in pull request #9004:
URL: https://github.com/apache/kafka/pull/9004#discussion_r462057744



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -764,12 +764,12 @@ private void connectProcessorAndStateStore(final String 
processorName,
 
 if (!sourceTopics.isEmpty()) {
 stateStoreNameToSourceTopics.put(stateStoreName,
-Collections.unmodifiableSet(sourceTopics));
+ 
Collections.unmodifiableSet(sourceTopics));

Review comment:
   format looks weird, maybe just do 4 spaces

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##
@@ -57,12 +57,12 @@ public StateStore getStateStore(final String name) {
 
 @SuppressWarnings("unchecked")
 @Override
-public  void forward(final K key, final V value) {
-final ProcessorNode previousNode = currentNode();
+public  void forward(final KIn key, final VIn value) {
+final ProcessorNode previousNode = currentNode();
 try {
-for (final ProcessorNode child : currentNode().children()) {
+for (final ProcessorNode child : 
currentNode().children()) {
 setCurrentNode(child);
-((ProcessorNode) child).process(key, value);
+((ProcessorNode) child).process(key, 
value); // FIXME

Review comment:
   Could we leave a more clear comment on what needs to be fixed?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+/**
+ * Processor context interface.
+ *
+ * @param  a bound on the types of keys that may be forwarded
+ * @param  a bound on the types of values that may be forwarded
+ */
+public interface ProcessorContext {
+
+/**
+ * Returns the application id.
+ *
+ * @return the application id
+ */
+String applicationId();
+
+/**
+ * Returns the task id.
+ *
+ * @return the task id
+ */
+TaskId taskId();
+
+/**
+ * Returns the default key serde.
+ *
+ * @return the key serializer
+ */
+Serde keySerde();
+
+/**
+ * Returns the default value serde.
+ *
+ * @return the value serializer
+ */
+Serde valueSerde();
+
+/**
+ * Returns the state directory for the partition.
+ *
+ * @return the state directory
+ */
+File stateDir();
+
+/**
+ * Returns Metrics instance.
+ *
+ * @return StreamsMetrics
+ */
+StreamsMetrics metrics();
+
+/**
+ * Registers and possibly restores the specified storage engine.
+ *
+ * @param store the storage engine
+ * @param stateRestoreCallback the restoration callback logic for 
log-backed state stores upon restart
+ *
+ * @throws IllegalStateException If store gets registered after 
initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the 
partition
+ */
+void register(final StateStore store,
+  final StateRestoreCallback stateRestoreCallback);
+
+/**
+ * Get the state store given the store name.
+ *
+ * @pa

[GitHub] [kafka] cmccabe commented on pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


cmccabe commented on pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#issuecomment-665288947







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9094: KAFKA-10054: add TRACE-level e2e latency metrics

2020-07-29 Thread GitBox


ableegoldman opened a new pull request #9094:
URL: https://github.com/apache/kafka/pull/9094


   Adds avg, min, and max e2e latency metrics at the new TRACE level. Also adds 
the missing `avg` task-level metric at the INFO level.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda edited a comment on pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


abbccdda edited a comment on pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#issuecomment-665410633


   Got 2/3 green, with one jenkins job terminated unexpectedly. 
https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7602/console



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-29 Thread GitBox


abbccdda commented on pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#issuecomment-665157437







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-29 Thread GitBox


abbccdda commented on a change in pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#discussion_r461313377



##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)

Review comment:
   nit: could use servers.indices

##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)
+  killBroker(i)
+
+try {
+  producer.sendOffsetsToTransaction(Map(

Review comment:
   Do we have unit test coverage for other transaction API max blocking as 
well? Do you mind adding them as separate tests and share the same module?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-29 Thread GitBox


mjsax commented on pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#issuecomment-664773350


   @vvcephei @abbccdda @guozhangwang @hachikuji -- I updated this PR according 
to your discussions. Needed to squash for rebasing to resolve conflicts.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims

2020-07-29 Thread GitBox


vvcephei commented on a change in pull request #9004:
URL: https://github.com/apache/kafka/pull/9004#discussion_r461271276



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -880,40 +880,41 @@ public synchronized ProcessorTopology 
buildGlobalStateTopology() {
 return globalGroups;
 }
 
+@SuppressWarnings("unchecked")
 private ProcessorTopology build(final Set nodeGroup) {
 Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
 
-final Map> processorMap = new 
LinkedHashMap<>();
-final Map> topicSourceMap = new HashMap<>();
-final Map> topicSinkMap = new HashMap<>();
+final Map> processorMap = new 
LinkedHashMap<>();
+final Map> topicSourceMap = new 
HashMap<>();
+final Map> topicSinkMap = new HashMap<>();
 final Map stateStoreMap = new LinkedHashMap<>();
 final Set repartitionTopics = new HashSet<>();
 
 // create processor nodes in a topological order ("nodeFactories" is 
already topologically sorted)
 // also make sure the state store map values following the insertion 
ordering
-for (final NodeFactory factory : nodeFactories.values()) {
+for (final NodeFactory factory : nodeFactories.values()) {
 if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-final ProcessorNode node = factory.build();
+final ProcessorNode node = factory.build();
 processorMap.put(node.name(), node);
 
 if (factory instanceof ProcessorNodeFactory) {
 buildProcessorNode(processorMap,
stateStoreMap,
-   (ProcessorNodeFactory) factory,
-   node);
+   (ProcessorNodeFactory) 
factory,
+   (ProcessorNode) node);
 
 } else if (factory instanceof SourceNodeFactory) {
 buildSourceNode(topicSourceMap,
 repartitionTopics,
-(SourceNodeFactory) factory,
-(SourceNode) node);
+(SourceNodeFactory) factory,
+(SourceNode) node);
 
 } else if (factory instanceof SinkNodeFactory) {
 buildSinkNode(processorMap,
   topicSinkMap,
   repartitionTopics,
-  (SinkNodeFactory) factory,
-  (SinkNode) node);
+  (SinkNodeFactory) factory,
+  (SinkNode) 
node);

Review comment:
   They have subtly different meanings, which I'm not 100% clear on all the 
time. I'm not sure if I had to change this one, of if it was an accident. I'll 
give it a closer look.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorShim.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+
+public final class ProcessorShim implements 
Processor {

Review comment:
   I think "adapter" is the standard design pattern name for this type of 
thing. Not sure why I thought "shim" was a good choice in the heat of the 
moment. Maybe because I'm kind of slipping these classes in the middle to make 
everything line up? I can change them to "adapter".

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -880,40 +880,41 @@ public synchronized ProcessorTopology 
buildGlobalStateTopology() {
 return globalGroups;
 }
 
+@SuppressWarnings("unchecked")
 private ProcessorTopology build(final Set nodeGroup) {
 Objects.requireNonNull

[GitHub] [kafka] cyrusv closed pull request #9093: Throw error on when keys not found in FileConfigProvider

2020-07-29 Thread GitBox


cyrusv closed pull request #9093:
URL: https://github.com/apache/kafka/pull/9093


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-29 Thread GitBox


chia7712 commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-664764918







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9096: Add comments to constrainedAssign and generalAssign method

2020-07-29 Thread GitBox


showuon commented on pull request #9096:
URL: https://github.com/apache/kafka/pull/9096#issuecomment-665472409


   @ableegoldman @vahidhashemian , please help review this PR to add comments 
to the assign methods. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long

2020-07-29 Thread GitBox


dajac opened a new pull request #9092:
URL: https://github.com/apache/kafka/pull/9092


   First tests have shown that `controller_mutation_rate` can be quite low 
(e.g. around 1) in clusters with multiple tenants. At the moment, the rate is 
defined as a Long which limits the possible low values. Using a Double seems 
more appropriate.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-29 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-665149860


   ```streams_standby_replica_test```  -> 
https://issues.apache.org/jira/browse/KAFKA-10287
   
   I will take a look at ```streams_broker_bounce_test```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-29 Thread GitBox


hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r461173162



##
File path: clients/src/main/resources/common/message/FetchRequest.json
##
@@ -55,35 +55,35 @@
   "about": "The minimum bytes to accumulate in the response." },
 { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": 
"0x7fff", "ignorable": true,
   "about": "The maximum bytes to fetch.  See KIP-74 for cases where this 
limit may not be honored." },
-{ "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": 
"0", "ignorable": false,
+{ "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": 
"0", "ignorable": true,

Review comment:
   I guess the implicit expectation is that if the protocol does not 
support the `read_committed` isolation level, then it wouldn't have 
transactional data anyway, so reverting to `read_uncommitted` is safe. Can't 
find a fault with that.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##
@@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, 
short apiVersion, Str
 case PRODUCE:
 return new ProduceRequest(struct, apiVersion);
 case FETCH:
-return new FetchRequest(struct, apiVersion);
+return new FetchRequest(new FetchRequestData(struct, 
apiVersion), apiVersion);

Review comment:
   nit: any reason not to stick with the same constructor convention as the 
other requests?

##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} 
objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on 
this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is 
made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send 
consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying 
record-set's transfer logic.
+ *
+ * For example,
+ *
+ * 
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * 
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care 
must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is 
flushed to the consumer. This class is
+ * intended to be used with {@link 
org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+private final String dest;
+private final Consumer sendConsumer;
+private final ByteBuffer buffer;
+private int mark;
+
+public RecordsWriter(String dest, int totalSize, Consumer 
sendConsumer) {

Review comment:
   Could we rename `totalSize` so that it is clear that it does not cover 
the record sizes. Maybe `totalOverheadSize` or `totalNonRecordSize` or 
something like that.

##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information r

[GitHub] [kafka] abbccdda commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-29 Thread GitBox


abbccdda commented on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-665189199







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett commented on a change in pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log

2020-07-29 Thread GitBox


bob-barrett commented on a change in pull request #9054:
URL: https://github.com/apache/kafka/pull/9054#discussion_r461721737



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -932,6 +927,7 @@ class LogManager(logDirs: Seq[File],
 val logsToCheckpoint = logsInDir(logDir)
 checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, 
logsToCheckpoint, ArrayBuffer.empty)
 checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
+sourceLog.removeLogMetrics()

Review comment:
   Good catch. Opened https://issues.apache.org/jira/browse/KAFKA-10320 to 
track it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed [WIP]

2020-07-29 Thread GitBox


guozhangwang commented on pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#issuecomment-664699699


   cc @ableegoldman @mjsax 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-29 Thread GitBox


abbccdda commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r462047068



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -274,30 +252,74 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = retryUntilSuccessOrThrowOnTaskTimeout(
+() -> globalConsumer.position(topicPartition),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
-while (offset < highWatermark) {
-final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+while (offset < highWatermark) { // when we "fix" this loop 
(KAFKA-7380 / KAFKA-10317)
+ // we should update the `poll()` 
timeout below
+
+// we ignore `poll.ms` config during bootstrapping phase and
+// apply `request.timeout.ms` plus `task.timeout.ms` instead
+//
+// the reason is, that `poll.ms` might be too short to give a 
fetch request a fair chance
+// to actually complete and we don't want to start 
`task.timeout.ms` too early
+//
+// we also pass `task.timeout.ms` into `poll()` directly right 
now as it simplifies our own code:
+// if we don't pass it in, we would just track the timeout 
ourselves and call `poll()` again
+// in our own retry loop; by passing the timeout we can reuse 
the consumer's internal retry loop instead
+//
+// note that using `request.timeout.ms` provides a 
conservative upper bound for the timeout;
+// this implies that we might start `task.timeout.ms` 
"delayed" -- however, starting the timeout
+// delayed is preferable (as it's more robust) than starting 
it too early
+//
+// TODO https://issues.apache.org/jira/browse/KAFKA-10315
+//   -> do a more precise timeout handling if `poll` would 
throw an exception if a fetch request fails
+//  (instead of letting the consumer retry fetch requests 
silently)
+//
+// TODO https://issues.apache.org/jira/browse/KAFKA-10317 and
+//  https://issues.apache.org/jira/browse/KAFKA-7380
+//  -> don't pass in `task.timeout.ms` to stay responsive if 
`KafkaStreams#close` gets called
+final ConsumerRecords records = 
globalConsumer.poll(requestTimeoutPlusTaskTimeout);
+if (records.isEmpty()) {
+// this will always throw
+maybeUpdateDeadlineOrThrow(time.milliseconds());

Review comment:
   Could we just throw here?

##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -326,13 +321,18 @@ bootstrap.serversstate.cleanup.delay.ms
 Low
 The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.
-60 milliseconds
+60 milliseconds (10 minutes)
   
   state.dir
 High
 Directory location for state stores.
 /tmp/kafka-streams
   
+  task.timeout.ms
+Medium

Review comment:
   @mjsax could we do a screenshot to make sure it looks good on the 
web-page?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -671,19 +1211,21 @@ private void writeCorruptCheckpoint() throws IOException 
{
 }
 }
 
-private void initializeCons

[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-29 Thread GitBox


vvcephei commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-665126186







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde

2020-07-29 Thread GitBox


yeralin commented on pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-665078937


   Any updates?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sasakitoa commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-29 Thread GitBox


sasakitoa commented on a change in pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#discussion_r461264833



##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)
+  killBroker(i)
+
+val offsets = new mutable.HashMap[TopicPartition, 
OffsetAndMetadata]().asJava
+offsets.put(new TopicPartition(topic1, 0), new OffsetAndMetadata(0))
+try {
+  producer.sendOffsetsToTransaction(offsets, "test-group")

Review comment:
   Replaced from `mutable.HashMap` to Map

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -687,7 +687,7 @@ public void sendOffsetsToTransaction(Map offs
 throwIfProducerClosed();
 TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
 sender.wakeup();
-result.await();
+result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);

Review comment:
   I wrote some description related to TimeoutException and 
InterruptedException

##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)
+  killBroker(i)
+
+val offsets = new mutable.HashMap[TopicPartition, 
OffsetAndMetadata]().asJava
+offsets.put(new TopicPartition(topic1, 0), new OffsetAndMetadata(0))
+try {
+  producer.sendOffsetsToTransaction(offsets, "test-group")

Review comment:
   Replaced from `mutable.HashMap` to `Map`

##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)

Review comment:
   Modified to use from `size` to `indices`, thanks.

##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)
+  killBroker(i)
+
+try {
+  producer.sendOffsetsToTransaction(Map(

Review comment:
   I added some timeout tests for initTransaction, commitTransction, 
abortTransaction using same base method.
   Is this implementation correct what you intended?

##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = cre

[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long

2020-07-29 Thread GitBox


rajinisivaram commented on pull request #9092:
URL: https://github.com/apache/kafka/pull/9092#issuecomment-664990134


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #6592: KAFKA-8326: Introduce List Serde

2020-07-29 Thread GitBox


mjsax commented on pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-665304810


   @yeralin -- your PR is in my review queue. Not sure how quickly I will find 
time to have a look though atm -- maybe next week, but I can't promise.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-29 Thread GitBox


guozhangwang commented on pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#issuecomment-664690277


   Any blocking APIs should be covered by the `max.block.ms` so I think this is 
rather a bug-fix.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-29 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r461302625



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   Thanks for digging into it @vvcephei -- The question about `pollTimeout` 
is fair. I guess for default configs it might not matter much, as the default 
is 100ms IIRC.
   
   At the same time, we apply the same `pollTimeout` during regular processing, 
and as a matter of fact, for this case, a use might want to use a longer 
poll-timeout, as otherwise, the thread would just "busy wait" anyway (only the 
responsiveness for a shutdown of the app should be considered).
   
   Thus, it might actually make sense to exclude `pollTimeout` completely and 
only use `requestTimeout + taskTimeout`. Again, using `taskTimeout` in `poll()` 
reduces the responsiveness of a shutdown -- however, atm during bootstrapping 
we ignore a shutdown signal anyway, hence, for now we don't make the situation 
worse. I create a ticket to fix this: 
https://issues.apache.org/jira/browse/KAFKA-10317 and will add a comment for 
now.
   
   Short related note: actually using `requestTimeout` seems to be a 
conservative upper bound for poll(). A request could fail with a different 
error before `requestTimeout` hits and would be retried internally for this 
case -- if this happens, we might want to start the `taskTimeout` earlier. 
However, we don't have any means atm to detect this case. Thus, using 
`requestTimeout` is the best option we have right now (because triggering 
`taskTimeout` too early seems to be worse than triggering it too late). I 
created a ticket though that might allow us to improve the code later: 
https://issues.apache.org/jira/browse/KAFKA-10315





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup

2020-07-29 Thread GitBox


abbccdda opened a new pull request #9095:
URL: https://github.com/apache/kafka/pull/9095


   In the unit test `shouldDieOnInvalidOffsetExceptionDuringStartup` for JDK 
11, we spotted a case where a global stream thread startup would stall if it 
fails immediately upon the first poll. The reason is that `start()` function 
only checks whether the thread is *not running*, as it needs to block until it 
finishes the initialization. However, if the thread transits to `DEAD` 
immediately, the `start()` call would block forever.
   
   Use the failed unit test to verify it works.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] pan3793 commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null

2020-07-29 Thread GitBox


pan3793 commented on pull request #8575:
URL: https://github.com/apache/kafka/pull/8575#issuecomment-665405076


   Fixed the mail thread, do you have any comments on the discussion and KIP 
doc? @rhauch 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9094: KAFKA-10054: add TRACE-level e2e latency metrics

2020-07-29 Thread GitBox


ableegoldman commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r461993688



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##
@@ -198,7 +198,7 @@
 .define(METRICS_RECORDING_LEVEL_CONFIG,
 Type.STRING,
 Sensor.RecordingLevel.INFO.toString(),
-
in(Sensor.RecordingLevel.INFO.toString(), 
Sensor.RecordingLevel.DEBUG.toString()),
+
in(Sensor.RecordingLevel.INFO.toString(), 
Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),

Review comment:
   It's kind of a bummer that we can't just add the new TRACE level for 
Streams only; we have to add it to all the clients that Streams passes its 
configs down to. We could check for the new TRACE level and strip it off before 
passing the configs on to the clients, but that just seems like asking for 
trouble.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##
@@ -88,13 +92,6 @@ private TaskMetrics() {}
 private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count 
of buffered records that are polled " +
 "from consumer and not yet processed for this active task";
 
-private static final String RECORD_E2E_LATENCY = "record-e2e-latency";

Review comment:
   Moved the common descriptions to StreamsMetricsImpl

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) {
 return byteEntries;
 }
 
+private void maybeRecordE2ELatency() {
+if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
   For KV stores, we just compare the current time with the current 
record's timestamp

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##
@@ -248,4 +253,12 @@ public void close() {
 private Bytes keyBytes(final K key) {
 return Bytes.wrap(serdes.rawKey(key));
 }
+
+private void maybeRecordE2ELatency() {
+if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
   For session and window stores, we also just compare the current time 
with the current record's timestamp when `put` is called. This can mean the e2e 
latency is measured several times on the same record, for example in a windowed 
aggregation.
   At first I thought that didn't make sense, but now I think it's actually 
exactly what we want. First of all, it means we can actually account for the 
latency between calls to `put` within a processor. For simple point inserts 
this might not be a huge increase on the scale of ms, but more complex 
processing may benefit from seeing this granularity of information. If they 
don't want it, well, that's why we introduced `TRACE`
   
   Second, while it might seem like we're over-weighting some records by 
measuring the e2e latency on them more than others, I'm starting to think this 
actually makes more sense than not: the big picture benefit/use case for the 
e2e latency metric is less "how long for this record to get sent downstream" 
and more "how long for this record to be reflected in the state store/IQ 
results". Given that, each record should be weighted by its actual proportion 
of the state store. You aren't querying individual records (in a window store), 
you're querying the windows themselves
   
   I toyed around with the idea of measuring the e2e latency relative to the 
window time, instead of the record timestamp, but ultimately couldn't find any 
sense in that. 
   Thoughts?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   Ok there's something I'm not understanding about this test and/or the 
built-in metrics version. For some reason, the KV-store metrics are 0 when 
`METRICS_0100_TO_24` is used, and 1 (as expected) when the latest version in 
used. I feel like this is wrong, and it should al

[GitHub] [kafka] guozhangwang commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-29 Thread GitBox


guozhangwang commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-665190912


   Sorry for the long delay @vvcephei , the PR lgtm!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-29 Thread GitBox


junrao commented on pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#issuecomment-665148070


   I think we agreed to change the logic to do the index sanity check on index 
opening.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)

2020-07-29 Thread Jira
Tomasz Bradło created KAFKA-10322:
-

 Summary: InMemoryWindowStore restore keys format incompatibility 
(lack of sequenceNumber in keys on topic)
 Key: KAFKA-10322
 URL: https://issues.apache.org/jira/browse/KAFKA-10322
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.5.0
 Environment: windows/linux
Reporter: Tomasz Bradło


I have regular groupBy&Counting stream configuration:
{code:java}
   
fun addStream(kStreamBuilder: StreamsBuilder) {
val storeSupplier = Stores.inMemoryWindowStore("count-store",
Duration.ofDays(10),
Duration.ofDays(1),
false)
val storeBuilder: StoreBuilder> = 
Stores
.windowStoreBuilder(storeSupplier, 
JsonSerde(CountableEvent::class.java), Serdes.Long())

kStreamBuilder
.stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
.map {_, jsonRepresentation -> 
KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofDays(1)))
.count(Materialized.with(JsonSerde(CountableEvent::class.java), 
Serdes.Long()))
.toStream()
.to("topic1-count")

val storeConsumed = 
Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java),
 Duration.ofDays(1).toMillis()), Serdes.Long())
kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", 
storeConsumed, passThroughProcessorSupplier)
}{code}
While sending to "topic1-count", for serializing the key 
[TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java]
 is used which is using 
[WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112]
 so the message key format is:
{code:java}
real_grouping_key + timestamp(8bytes){code}
 

Everything works. I can get correct values from state-store. But, in recovery 
scenario, when [GlobalStateManagerImpl 
|https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters
 offset < highWatermark loop then

[InMemoryWindowStore stateRestoreCallback 
|https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads
 from "topic1-count" and fails to extract valid key and timestamp using 
[WindowKeySchema.extractStoreKeyBytes 
|https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and
 [WindowKeySchema.extractStoreTimestamp. 
|https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It
 fails because it expects format:
{code:java}
real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code}
How this is supposed to work in this case?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6733) Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-07-29 Thread Badai Aqrandista (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167207#comment-17167207
 ] 

Badai Aqrandista commented on KAFKA-6733:
-

I did a bad merge on PR 8909. So I've closed it.

I created a new PR 9099 that contain the against the latest trunk. This is 
ready for review: https://github.com/apache/kafka/pull/9099

> Support of printing additional ConsumerRecord fields in 
> DefaultMessageFormatter
> ---
>
> Key: KAFKA-6733
> URL: https://issues.apache.org/jira/browse/KAFKA-6733
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Mateusz Zakarczemny
>Assignee: Badai Aqrandista
>Priority: Minor
>
> It would be useful to have possibility of printing headers, partition and 
> offset in ConsoleConsumer. Especially support of headers seems to be missing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10323) NullPointerException during rebalance

2020-07-29 Thread yazgoo (Jira)
yazgoo created KAFKA-10323:
--

 Summary: NullPointerException during rebalance
 Key: KAFKA-10323
 URL: https://issues.apache.org/jira/browse/KAFKA-10323
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.5.0
Reporter: yazgoo


*confluent platform version: 5.5.0-ccs*

connector used: s3

Connector stops after rebalancing:

ERROR [Worker clientId=connect-1, groupId=connect] Couldn't instantiate task 
 because it has an invalid task configuration. This task will not 
execute until reconfigured.

java.lang.NullPointerException
 at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:427)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1147)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:126)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1162)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1158)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes

2020-07-29 Thread Albert Lowis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167321#comment-17167321
 ] 

Albert Lowis commented on KAFKA-9273:
-

Hi [~bbejeck] , [~sujayopensource] I am a newbie looking to contribute,

Can I take up this task? Since I see that it has been sometime since the last 
activity

 

Thank you,

Albert

> Refactor AbstractJoinIntegrationTest and Sub-classes
> 
>
> Key: KAFKA-9273
> URL: https://issues.apache.org/jira/browse/KAFKA-9273
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: newbie
>
> The  AbstractJoinIntegrationTest uses an embedded broker, but not all the 
> sub-classes require the use of an embedded broker anymore.  Additionally, 
> there are two test remaining that require an embedded broker, but they don't 
> perform joins, the are tests validating other conditions, so ideally those 
> tests should move into a separate test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes

2020-07-29 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167349#comment-17167349
 ] 

Bill Bejeck commented on KAFKA-9273:


Hi [~albert02lowis],

 

Thanks for your interest.  It looks like [~sujayopensource] has not started 
work on this ticket yet.  I'd give another day or so to respond, then if you 
don't hear anything back, feel free to pick this ticket up.  

I've taken the liberty of adding you to the contributors list, so you should be 
able to self-assign this ticket and any others in the future.

 

-Bill

> Refactor AbstractJoinIntegrationTest and Sub-classes
> 
>
> Key: KAFKA-9273
> URL: https://issues.apache.org/jira/browse/KAFKA-9273
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: newbie
>
> The  AbstractJoinIntegrationTest uses an embedded broker, but not all the 
> sub-classes require the use of an embedded broker anymore.  Additionally, 
> there are two test remaining that require an embedded broker, but they don't 
> perform joins, the are tests validating other conditions, so ideally those 
> tests should move into a separate test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)
Tommy Becker created KAFKA-10324:


 Summary: Pre-0.11 consumers can get stuck when messages are 
downconverted from V2 format
 Key: KAFKA-10324
 URL: https://issues.apache.org/jira/browse/KAFKA-10324
 Project: Kafka
  Issue Type: Bug
Reporter: Tommy Becker


As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
even if that offset gets removed due to log compaction. If a pre-0.11 consumer 
seeks to such an offset and issues a fetch, it will get an empty batch, since 
offsets prior to the requested one are filtered out during down-conversion. 
KAFKA-5443 added consumer-side logic to advance the fetch offset in this case, 
but this leaves old consumers unable to consume these topics.

The exact behavior varies depending on consumer version. The 0.10.0.0 consumer 
throws RecordTooLargeException and dies, believing that the record must not 
have been returned because it was too large. The 0.10.1.0 consumer simply spins 
fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167357#comment-17167357
 ] 

Tommy Becker commented on KAFKA-10324:
--

We have some legacy applications whose consumer versions are not easily 
upgraded hitting this issue, and it's hard to diagnose since the consumers do 
not give a proper message (or indeed any message in the case of the 0.10.1.0 
consumer) and since it is dependent on the way messages are batched, which is 
opaque to clients.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167369#comment-17167369
 ] 

Ismael Juma commented on KAFKA-10324:
-

Thanks for the report. Looks like this issue was introduced in 0.11.0.0, would 
you agree? Interesting that no-one reported it for so long. Out of curiosity, 
the old consumers are Java consumers or other languages?

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167371#comment-17167371
 ] 

Tommy Becker commented on KAFKA-10324:
--

Thanks for the response [~ijuma]. Yes, these are Java consumers, and I agree 
it's odd that this has not been found before now. I have limited familiarity 
with the code base, so it's possible I'm missing something but I believe the 
issue is as I described. In my tests I'm trying to consume a 25GB topic and 
have found 2 distinct offsets which the consumer cannot advance beyond, and 
they are both cases where the offset:
 # Is the lastOffset in the last batch of its log segment
 # Does not actually exist, presumably due to log compaction.

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167387#comment-17167387
 ] 

Jason Gustafson commented on KAFKA-10324:
-

[~twbecker] Thanks for the report. I'm trying to understand why the fetch is 
not including batches beyond the one with the last offset removed. Is that 
because the batch itself is already satisfying the fetch max bytes? It would be 
helpful to include a snippet from a dump of the log with the batch that is 
causing problems.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] gwenshap commented on pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log

2020-07-29 Thread GitBox


gwenshap commented on pull request #9054:
URL: https://github.com/apache/kafka/pull/9054#issuecomment-665773125


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-07-29 Thread GitBox


soarez commented on pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#issuecomment-665668720


   @mjsax what can we do to proceed?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] badaiaqrandista commented on pull request #4807: KAFKA-6733: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-07-29 Thread GitBox


badaiaqrandista commented on pull request #4807:
URL: https://github.com/apache/kafka/pull/4807#issuecomment-665660018


   Closing this PR as it has been superseded by PR 9099 
(https://github.com/apache/kafka/pull/9099).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method

2020-07-29 Thread GitBox


chia7712 commented on pull request #9096:
URL: https://github.com/apache/kafka/pull/9096#issuecomment-665504255


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


mumrah commented on a change in pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#discussion_r462352682



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.JaasContext
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class manages the connection between a broker and the controller. It 
runs a single
+ * {@link BrokerToControllerRequestThread} which uses the broker's metadata 
cache as its own metadata to find
+ * and connect to the controller. The channel is async and runs the network 
connection in the background.
+ * The maximum number of in-flight requests are set to one to ensure orderly 
response from the controller, therefore
+ * care must be taken to not block on outstanding requests for too long.
+ */
+class BrokerToControllerChannelManager(metadataCache: 
kafka.server.MetadataCache,
+   time: Time,
+   metrics: Metrics,
+   config: KafkaConfig,
+   threadNamePrefix: Option[String] = 
None) extends Logging {
+  private val requestQueue = new 
LinkedBlockingDeque[BrokerToControllerQueueItem]
+  private val logContext = new 
LogContext(s"[broker-${config.brokerId}-to-controller] ")
+  private val manualMetadataUpdater = new ManualMetadataUpdater()
+  private val requestThread = newRequestThread
+
+  def start(): Unit = {
+requestThread.start()
+  }
+
+  def shutdown(): Unit = {
+requestThread.shutdown()
+requestThread.awaitShutdown()
+  }
+
+  private[server] def newRequestThread = {
+val brokerToControllerListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+val brokerToControllerSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+
+val networkClient = {
+  val channelBuilder = ChannelBuilders.clientChannelBuilder(
+brokerToControllerSecurityProtocol,
+JaasContext.Type.SERVER,
+config,
+brokerToControllerListenerName,
+config.saslMechanismInterBrokerProtocol,
+time,
+config.saslInterBrokerHandshakeRequestEnable,
+logContext
+  )
+  val selector = new Selector(
+NetworkReceive.UNLIMITED,
+Selector.NO_IDLE_TIMEOUT_MS,
+metrics,
+time,
+"BrokerToControllerChannel",
+Map("BrokerId" -> config.brokerId.toString).asJava,
+false,
+channelBuilder,
+logContext
+  )
+  new NetworkClient(
+selector,
+manualMetadataUpdater,
+config.brokerId.toString,
+1,
+0,
+0,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+config.requestTimeoutMs,
+config.connectionSetupTimeoutMs,
+config.connectionSetupTimeoutMaxMs,
+ClientDnsLookup.USE_ALL_DNS_IPS,
+time,
+false,
+new ApiVersions,
+logContext
+  )
+}
+val threadName = threadNamePrefix match {
+  case None => s"broker-${config.brokerId}-to-controller-send-thread"
+  case Some(name) => 
s"$name:broker-${config.brokerId}-to-controller-send-thread"
+}
+
+new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, 
requestQueue, metadataCache, config,
+  brokerToControllerListenerName, time, threadName)
+  }
+
+  private[server] def sendRequest(request: AbstractRequest.Builder[_ <: 
AbstractRequest]

[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-29 Thread GitBox


mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r462364719



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
  LinkedHashMap> 
responseData,
  int throttleTimeMs,
  int sessionId) {
-this.error = error;
-this.responseData = responseData;
-this.throttleTimeMs = throttleTimeMs;
-this.sessionId = sessionId;
+this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+this.responseDataMap = responseData;
 }
 
-public static FetchResponse parse(Struct struct) {
-LinkedHashMap> 
responseData = new LinkedHashMap<>();
-for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-Struct topicResponse = (Struct) topicResponseObj;
-String topic = topicResponse.get(TOPIC_NAME);
-for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-Struct partitionResponse = (Struct) partitionResponseObj;
-Struct partitionResponseHeader = 
partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-int partition = partitionResponseHeader.get(PARTITION_ID);
-Errors error = 
Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-long highWatermark = 
partitionResponseHeader.get(HIGH_WATERMARK);
-long lastStableOffset = 
partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, 
INVALID_LAST_STABLE_OFFSET);
-long logStartOffset = 
partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-Optional preferredReadReplica = Optional.of(
-partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, 
INVALID_PREFERRED_REPLICA_ID)
-
).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-BaseRecords baseRecords = 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-if (!(baseRecords instanceof MemoryRecords))
-throw new IllegalStateException("Unknown records type 
found: " + baseRecords.getClass());
-MemoryRecords records = (MemoryRecords) baseRecords;
-
-List abortedTransactions = null;
-if 
(partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-Object[] abortedTransactionsArray = 
partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-if (abortedTransactionsArray != null) {
-abortedTransactions = new 
ArrayList<>(abortedTransactionsArray.length);
-for (Object abortedTransactionObj : 
abortedTransactionsArray) {
-Struct abortedTransactionStruct = (Struct) 
abortedTransactionObj;
-long producerId = 
abortedTransactionStruct.get(PRODUCER_ID);
-long firstOffset = 
abortedTransactionStruct.get(FIRST_OFFSET);
-abortedTransactions.add(new 
AbortedTransaction(producerId, firstOffset));
-}
-}
-}
-
-PartitionData partitionData = new 
PartitionData<>(error, highWatermark, lastStableOffset,
-logStartOffset, preferredReadReplica, 
abortedTransactions, records);
-responseData.put(new TopicPartition(topic, partition), 
partitionData);
-}
-}
-return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, 
(short) 0)), responseData,
-struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), 
struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+public FetchResponse(FetchResponseData fetchResponseData) {
+this.data = fetchResponseData;
+this.responseDataMap = toResponseDataMap(fetchResponseData);
 }
 
 @Override
 public Struct toStruct(short version) {
-return toStruct(version, throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+return data.toStruct(version);
 }
 
 @Override
-protected Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
-Struct responseHeaderStruct = responseHeader.toStruct();
-Struct responseBodyStruct = toStruct(apiVersion);
-
-// write the total size and the response header
-ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() 
+ 4);
-buffer.putInt(responseHeaderStruct.sizeOf() + 
responseBodyStruct.sizeOf());
-responseHeaderStruct.writeTo(buffer);
+public Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
+// Generate the Sends for 

[GitHub] [kafka] abbccdda commented on pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


abbccdda commented on pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#issuecomment-665757647







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness

2020-07-29 Thread GitBox


guozhangwang commented on pull request #9087:
URL: https://github.com/apache/kafka/pull/9087#issuecomment-665779451







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] badaiaqrandista opened a new pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-07-29 Thread GitBox


badaiaqrandista opened a new pull request #9099:
URL: https://github.com/apache/kafka/pull/9099


   Implementation of KIP-431 - Support of printing additional ConsumerRecord 
fields in DefaultMessageFormatter
   
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-29 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r462327050



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) {
 return byteEntries;
 }
 
+private void maybeRecordE2ELatency() {
+if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
   I think, you do not need to check for metrics with 
`e2eLatencySensor.hasMetrics()`. There should always be metrics within this 
sensor.  
   `hasMetrics()` is used in `StreamsMetricsImpl#maybeMeasureLatency()` because 
some sensors may not contain any metrics due to the built-in metrics version. 
For instance, the destroy sensor exists for built-in metrics version 0.10.0-2.4 
but not for latest. To avoid version checks in the record processing code, we 
just create an empty sensor and call record on it effectively not recording any 
metrics for this sensor for version latest.
   We do not hide newly added metrics if the built-in version is set to an 
older version.
   Same applies to the other uses of `hasMetrics()` introduced in this PR.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##
@@ -248,4 +253,12 @@ public void close() {
 private Bytes keyBytes(final K key) {
 return Bytes.wrap(serdes.rawKey(key));
 }
+
+private void maybeRecordE2ELatency() {
+if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
   Your approach makes sense to me. I agree that the latency should refer 
to the update in the state store and not to record itself. If a record updates 
the state more than once then latency should be measured each time. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
##
@@ -443,6 +447,25 @@ public static Sensor suppressionBufferSizeSensor(final 
String threadId,
 );
 }
 
+public static Sensor e2ELatencySensor(final String threadId,
+  final String taskId,
+  final String storeType,
+  final String storeName,
+  final StreamsMetricsImpl 
streamsMetrics) {
+final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, 
taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE);
+final Map tagMap = 
streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName);
+addAvgAndMinAndMaxToSensor(
+sensor,
+STATE_STORE_LEVEL_GROUP,

Review comment:
   You need to use the `stateStoreLevelGroup()` here instead of 
`STATE_STORE_LEVEL_GROUP` because the group name depends on the version and the 
store type.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   I agree with you, it should always be 1. It is the group of the metrics. 
See my comment in `StateStoreMetrics`. I am glad this test served its purpose, 
because I did not notice this in the unit tests! 

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -468,38 +468,44 @@ public void 
shouldRecordE2ELatencyOnProcessForSourceNodes() {
 }
 
 @Test
-public void shouldRecordE2ELatencyMinAndMax() {
+public void shouldRecordE2ELatencyAvgAndMinAndMax() {
 time = new MockTime(0L, 0L, 0L);
 metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
 task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
 
 final String sourceNode = source1.name();
 
-final Metric maxMetric = getProcessorMetric("record-e2e-latency", 
"%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
+final Metric avgMetric = getProcessorMetric("record-e2e-latency", 
"%s-avg", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
 final Metric minMetric = getProcessorMetric("record-e2e-latency", 
"%s-min", ta

[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167400#comment-17167400
 ] 

Tommy Becker commented on KAFKA-10324:
--

[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []

### End of segment is here

 

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-29 Thread GitBox


mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665795947


   I ran a console consumer perf test (at @hachikuji's suggestion) and took a 
profile. 
   
   
![image](https://user-images.githubusercontent.com/55116/88832229-81be6d00-d19e-11ea-9ee9-51b6054a6731.png)
   
   Zoomed in a bit on the records part:
   
   
![image](https://user-images.githubusercontent.com/55116/88832276-93a01000-d19e-11ea-9293-a138c38f6ed3.png)
   
   This was with only a handful of partitions on a single broker (on my 
laptop), but it confirms that the new FetchResponse serialization is hitting 
the same sendfile path as the previous code.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167400#comment-17167400
 ] 

Tommy Becker edited comment on KAFKA-10324 at 7/29/20, 5:46 PM:


[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
End of segment is here

 

 


was (Author: twbecker):
[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []

### End of segment is here

 

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-07-29 Thread GitBox


cadonna commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-665616943


   Call for review: @vvcephei @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-07-29 Thread GitBox


dajac commented on a change in pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#discussion_r462306541



##
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##
@@ -459,48 +466,32 @@ class DefaultMessageFormatter extends MessageFormatter {
   var printKey = false
   var printValue = true
   var printPartition = false
-  var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
-  var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
+  var printOffset = false
+  var printHeaders = false
+  var keySeparator = utfBytes("\t")
+  var lineSeparator = utfBytes("\n")
+  var headersSeparator = utfBytes(",")
+  var nullLiteral = utfBytes("null")
 
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
-
-  override def configure(configs: Map[String, _]): Unit = {
-val props = new java.util.Properties()
-configs.asScala.foreach { case (key, value) => props.put(key, 
value.toString) }
-if (props.containsKey("print.timestamp"))
-  printTimestamp = 
props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
-if (props.containsKey("print.key"))
-  printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
-if (props.containsKey("print.value"))
-  printValue = 
props.getProperty("print.value").trim.equalsIgnoreCase("true")
-if (props.containsKey("print.partition"))
-  printPartition = 
props.getProperty("print.partition").trim.equalsIgnoreCase("true")
-if (props.containsKey("key.separator"))
-  keySeparator = 
props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
-if (props.containsKey("line.separator"))
-  lineSeparator = 
props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
-// Note that `toString` will be called on the instance returned by 
`Deserializer.deserialize`
-if (props.containsKey("key.deserializer")) {
-  keyDeserializer = 
Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor()
-.newInstance().asInstanceOf[Deserializer[_]])
-  
keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.",
 props).asScala.asJava, true)
-}
-// Note that `toString` will be called on the instance returned by 
`Deserializer.deserialize`
-if (props.containsKey("value.deserializer")) {
-  valueDeserializer = 
Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor()
-.newInstance().asInstanceOf[Deserializer[_]])
-  
valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.",
 props).asScala.asJava, false)
-}
-  }
-
-  private def propertiesWithKeyPrefixStripped(prefix: String, props: 
Properties): Properties = {
-val newProps = new Properties()
-props.asScala.foreach { case (key, value) =>
-  if (key.startsWith(prefix) && key.length > prefix.length)
-newProps.put(key.substring(prefix.length), value)
-}
-newProps
+  var headersDeserializer: Option[Deserializer[_]] = None
+
+  override def init(props: Properties): Unit = {

Review comment:
   `init(props: Properties)` has been deprecated. It would be great if we 
could keep using `configure(configs: Map[String, _])` as before. I think that 
we should also try to directly extract the values from the `Map` instead of 
using a `Properties`.

##
File path: core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
##
@@ -0,0 +1,235 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package unit.kafka.tools
+
+import java.io.{ByteArrayOutputStream, Closeable, PrintStream}
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+
+import kafka.tools.DefaultMessageFormatter
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.Header
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
+import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.serialization.Deserializer
+import org.junit.Assert._
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.run

  1   2   >