[GitHub] [kafka] ableegoldman commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-04 Thread GitBox


ableegoldman commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r799253840



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##
@@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
 maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 log.info("Topology {} is overriding {} to {}", topologyName, 
BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
 } else {
-maxBufferedSize = 
globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+maxBufferedSize = 
globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+? 
globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : 
-1;
 }
 
-if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
-cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-log.info("Topology {} is overriding {} to {}", topologyName, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) ||
+isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+
+if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+log.info("Topology {} is using both {} and deprecated config 
{}. overriding {} to {}",
+topologyName,
+STATESTORE_CACHE_MAX_BYTES_CONFIG,
+CACHE_MAX_BYTES_BUFFERING_CONFIG,
+STATESTORE_CACHE_MAX_BYTES_CONFIG,
+cacheSize);
+} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+log.info("Topology {} is using deprecated config {}. 
overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+} else {
+cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+log.info("Topology {} is overriding {} to {}", topologyName, 
STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize);
+}
 } else {
-cacheSize = 
globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+cacheSize = 
globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);

Review comment:
   Sounds good! There's no rush, but I'll make sure we have your new PRs 
reviewed and merged quickly whenever they are ready, since you've worked so 
hard on this already. I'm sorry I wasn't able to make another pass on your 
original PR, but hopefully this won't be too much of a bother.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-04 Thread GitBox


ableegoldman commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r799261397



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1034,14 +1037,15 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
 final StreamThread streamThread;
 synchronized (changeThreadCount) {
 final int threadIdx = getNextThreadIndex();
-final int numLiveThreads = getNumLiveStreamThreads();
-final long cacheSizePerThread = 
getCacheSizePerThread(numLiveThreads + 1);
-log.info("Adding StreamThread-{}, there will now be {} live 
threads and the new cache size per thread is {}",
- threadIdx, numLiveThreads + 1, cacheSizePerThread);
-resizeThreadCache(cacheSizePerThread);
 // Creating thread should hold the lock in order to avoid 
duplicate thread index.
 // If the duplicate index happen, the metadata of thread may 
be duplicate too.
-streamThread = createAndAddStreamThread(cacheSizePerThread, 
threadIdx);
+// Also, we create the new thread with initial values of cache 
size and max buffer size as 0
+// and then resize them later
+streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
+final int numLiveThreads = getNumLiveStreamThreads();
+resizeThreadCacheAndBufferMemory(numLiveThreads + 1);

Review comment:
   One more thing -- @wcarlson5 mentioned there was an off-by-one error 
here due to the re-ordering of these calls. Specifically, the `+ 1` in this 
line was necessary before now because we called `resizeThreadCache` before 
actually adding the new thread, so we had to account for the new thread by 
adding one. But since we now create/add the new thread first, the 
`getNumLiveStreamThreads` method will actually return the correct number of 
threads, so we don't need the `+ 1` anymore.
   
   On that note, I take it we reordered these calls because we now create the 
thread without the cache value and then call `resize` to set the cache after 
the thread has already been created. I was wondering: why do we need to do this 
post-construction resizing? I only looked at this part of the PR briefly, but 
it seems to me like we always have the actual cache size known when we're 
creating the thread, so can't we just pass that in to the StreamThread#create 
method/constructor? It's just a bit confusing to initialize the cache size to 
some random value, it took me a little while to figure out what was going on 
with that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] predatorray commented on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id

2022-02-04 Thread GitBox


predatorray commented on pull request #10525:
URL: https://github.com/apache/kafka/pull/10525#issuecomment-1029767796


   @guozhangwang Thanks for the reply and the review. The change was rebased.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

2022-02-04 Thread GitBox


satishd commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r796465473



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
+import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
+import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
+
+public class RemoteLogInputStream implements LogInputStream {
+private final InputStream is;
+private final ByteBuffer logHeaderBuffer = 
ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);
+
+public RemoteLogInputStream(InputStream is) {
+this.is = is;
+}
+
+@Override
+public RecordBatch nextBatch() throws IOException {
+logHeaderBuffer.rewind();
+Utils.readFully(is, logHeaderBuffer);
+
+if (logHeaderBuffer.position() < HEADER_SIZE_UP_TO_MAGIC)
+return null;
+
+logHeaderBuffer.rewind();
+logHeaderBuffer.getLong(OFFSET_OFFSET);

Review comment:
   Good point. It is not really needed. Looks like it was added earlier to 
log offset but the log was removed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2022-02-04 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486576#comment-17486576
 ] 

Federico Valeri edited comment on KAFKA-12635 at 2/4/22, 8:51 AM:
--

I was able to reproduce the issue on Kafka 2.7.2 and 2.8.1, but not on 3.1.0.

State of the source cluster after producing/consuming 1mln records:

{code}
$ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9090 --describe --topic 
my-topic
Topic: my-topic PartitionCount: 3   ReplicationFactor: 3Configs: 
min.insync.replicas=2,message.format.version=2.8-IV1
Topic: my-topic Partition: 0Leader: 2   Replicas: 2,0,1 Isr: 
2,0,1
Topic: my-topic Partition: 1Leader: 0   Replicas: 0,1,2 Isr: 
0,1,2
Topic: my-topic Partition: 2Leader: 1   Replicas: 1,2,0 Isr: 
1,2,0

$ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9090 --describe 
--group my-group

Consumer group 'my-group' has no active members.

GROUP   TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  
   CONSUMER-ID HOSTCLIENT-ID
my-groupmy-topic0  332600  332600  0
   -   -   -
my-groupmy-topic1  335510  335510  0
   -   -   -
my-groupmy-topic2  331890  331890  0
   -   -   -
{code}

State of the target cluster after MM2 has done its job 
(sync.group.offsets.enabled = true):

{code}
$ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9093 --describe --topic 
my-topic
Topic: my-topic PartitionCount: 3   ReplicationFactor: 3Configs: 
min.insync.replicas=2,message.format.version=2.8-IV1
Topic: my-topic Partition: 0Leader: 3   Replicas: 3,4,5 Isr: 
3,4,5
Topic: my-topic Partition: 1Leader: 4   Replicas: 4,5,3 Isr: 
4,5,3
Topic: my-topic Partition: 2Leader: 5   Replicas: 5,3,4 Isr: 
5,3,4

$ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9093 --describe 
--group my-group

Consumer group 'my-group' has no active members.

GROUP   TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  
   CONSUMER-ID HOSTCLIENT-ID
my-groupmy-topic0  332600  0   
-332600 -   -   -
my-groupmy-topic1  335510  0   
-335510 -   -   -
my-groupmy-topic2  331890  0   
-331890 -   -   -
{code}

There is actually no need to set a custom value for retention.ms in order to 
trigger the issue.


was (Author: fvaleri):
I was able to reproduce the issue on Kafka 2.7.2 and 2.8.1, but not on 3.1.0.

State of the source cluster after producing/consuming 1mln records:

{code}
$ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9090 --describe --topic 
my-topic
Topic: my-topic PartitionCount: 3   ReplicationFactor: 3Configs: 
min.insync.replicas=2,message.format.version=2.8-IV1
Topic: my-topic Partition: 0Leader: 2   Replicas: 2,0,1 Isr: 
2,0,1
Topic: my-topic Partition: 1Leader: 0   Replicas: 0,1,2 Isr: 
0,1,2
Topic: my-topic Partition: 2Leader: 1   Replicas: 1,2,0 Isr: 
1,2,0

$ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9090 --describe 
--group my-group

Consumer group 'my-group' has no active members.

GROUP   TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  
   CONSUMER-ID HOSTCLIENT-ID
my-groupmy-topic0  332600  332600  0
   -   -   -
my-groupmy-topic1  335510  335510  0
   -   -   -
my-groupmy-topic2  331890  331890  0
   -   -   -
{code}

State of the target cluster after MM2 has done its job 
(sync.group.offsets.enabled = true, replication.policy.class = 
io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy):

{code}
$ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9093 --describe --topic 
my-topic
Topic: my-topic PartitionCount: 3   ReplicationFactor: 3Configs: 
min.insync.replicas=2,message.format.version=2.8-IV1
Topic: my-topic Partition: 0Leader: 3   Replicas: 3,4,5 Isr: 
3,4,5
Topic: my-topic Partition: 1Leader: 4   Replicas: 4,5,3 Isr: 
4,5,3
Topic: my-topic Partition: 2Leader: 5   Replicas: 5,3,4 Isr: 
5,3,4

$ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9093 --describe 
--group my-

[jira] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2022-02-04 Thread Federico Valeri (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-12635 ]


Federico Valeri deleted comment on KAFKA-12635:
-

was (Author: fvaleri):
I was able to reproduce the issue on Kafka 2.7.2 and 2.8.1, but not on 3.1.0.

State of the source cluster after producing/consuming 1mln records:

{code}
$ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9090 --describe --topic 
my-topic
Topic: my-topic PartitionCount: 3   ReplicationFactor: 3Configs: 
min.insync.replicas=2,message.format.version=2.8-IV1
Topic: my-topic Partition: 0Leader: 2   Replicas: 2,0,1 Isr: 
2,0,1
Topic: my-topic Partition: 1Leader: 0   Replicas: 0,1,2 Isr: 
0,1,2
Topic: my-topic Partition: 2Leader: 1   Replicas: 1,2,0 Isr: 
1,2,0

$ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9090 --describe 
--group my-group

Consumer group 'my-group' has no active members.

GROUP   TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  
   CONSUMER-ID HOSTCLIENT-ID
my-groupmy-topic0  332600  332600  0
   -   -   -
my-groupmy-topic1  335510  335510  0
   -   -   -
my-groupmy-topic2  331890  331890  0
   -   -   -
{code}

State of the target cluster after MM2 has done its job 
(sync.group.offsets.enabled = true):

{code}
$ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9093 --describe --topic 
my-topic
Topic: my-topic PartitionCount: 3   ReplicationFactor: 3Configs: 
min.insync.replicas=2,message.format.version=2.8-IV1
Topic: my-topic Partition: 0Leader: 3   Replicas: 3,4,5 Isr: 
3,4,5
Topic: my-topic Partition: 1Leader: 4   Replicas: 4,5,3 Isr: 
4,5,3
Topic: my-topic Partition: 2Leader: 5   Replicas: 5,3,4 Isr: 
5,3,4

$ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9093 --describe 
--group my-group

Consumer group 'my-group' has no active members.

GROUP   TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  
   CONSUMER-ID HOSTCLIENT-ID
my-groupmy-topic0  332600  0   
-332600 -   -   -
my-groupmy-topic1  335510  0   
-335510 -   -   -
my-groupmy-topic2  331890  0   
-331890 -   -   -
{code}

There is actually no need to set a custom value for retention.ms in order to 
trigger the issue.

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13626) NullPointerException in Selector.pollSelectionKeys: channel is null

2022-02-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486916#comment-17486916
 ] 

Daniel Häuser commented on KAFKA-13626:
---

[~Kvicii] 

I don't know exactly what happened there. As far as I know, the machine that 
ran the Kafka brokers were not able to communicate with each other at that 
point of time. Unfortunately I do not have any further access to them, nor the 
configuration of the client.

But no matter what caused this issue, I don't think that the Kafka Client 
should throw a NullPointerException in any case. The Exception should be 
something that the caller can handle accordingly (e.g. IOException on 
networking issues so that you can retry). That's why I opened this bug report.

However, I can also understand if this ticket will closed, because it contains 
too little information to be able to reproduce the problem.

> NullPointerException in Selector.pollSelectionKeys: channel is null
> ---
>
> Key: KAFKA-13626
> URL: https://issues.apache.org/jira/browse/KAFKA-13626
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.7.1
>Reporter: Daniel Häuser
>Priority: Minor
>
> This NullPointerException occured while we were having networking issues.
> Unfortunately I cannot provide much more information than this stack trace 
> because this is all I got from our operations team.
> {code:java}
> java.lang.IllegalStateException: This error handler cannot process 
> 'java.lang.NullPointerException's; no record information is available
> at 
> org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200)
> at 
> org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1599)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210)
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at java.base/java.lang.Thread.run(Thread.java:831)
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "org.apache.kafka.common.network.KafkaChannel.id()" because "channel" is null
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:516)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> at jdk.internal.reflect.GeneratedMethodAccessor128.invoke(Unknown 
> Source)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> at 
> org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
> at 
> org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
> at jdk.proxy2/jdk.proxy2.$Proxy137.poll(Unknown Source)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
> ... 3 common 

[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-04 Thread GitBox


ijuma commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799428683



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -578,41 +587,45 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 } else {
 if (shouldStopDrainBatchesForPartition(first, tp))
 break;
+}
 
-boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-ProducerIdAndEpoch producerIdAndEpoch =
-transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-ProducerBatch batch = deque.pollFirst();
-if (producerIdAndEpoch != null && !batch.hasSequence()) {
-// If the producer id/epoch of the partition do not 
match the latest one
-// of the producer, we update it and reset the 
sequence. This should be
-// only done when all its in-flight batches have 
completed. This is guarantee
-// in `shouldStopDrainBatchesForPartition`.
-
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-// If the batch already has an assigned sequence, then 
we should not change the producer id and
-// sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-// may actually have been accepted, and if we change 
the producer id and sequence here, this
-// attempt will also be accepted, causing a duplicate.
-//
-// Additionally, we update the next sequence number 
bound for the partition, and also have
-// the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-// even if we receive out of order responses.
-batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-transactionManager.addInFlightBatch(batch);
-}
-batch.close();
-size += batch.records().sizeInBytes();
-ready.add(batch);
+// do the rest of the work by processing outside the lock
+// close() is particularly expensive
+batch = deque.pollFirst();
+}
 
-batch.drained(now);
-}
+boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+ProducerIdAndEpoch producerIdAndEpoch =
+transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+if (producerIdAndEpoch != null && !batch.hasSequence()) {
+// If the producer id/epoch of the partition do not match the 
latest one
+// of the producer, we update it and reset the sequence. This 
should be
+// only done when all its in-flight batches have completed. 
This is guarantee
+// in `shouldStopDrainBatchesForPartition`.
+
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+// If the batch already has an assigned sequence, then we 
should not change the producer id and
+// sequence number, since this may introduce duplicates. In 
particular, the previous attempt
+// may actually have been accepted, and if we change the 
producer id and sequence here, this
+// attempt will also be accepted, causing a duplicate.
+//
+// Additionally, we update the next sequence number bound for 
the partition, and also have
+// the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+// even if we receive out of order responses.
+batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);

Review comment:
   No, not a blocker.




-- 
This is an automated message from 

[GitHub] [kafka] ijuma commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-04 Thread GitBox


ijuma commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799456068



##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
##
@@ -239,6 +239,66 @@ public void testDouble() throws IOException {
 assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L);
 }
 
+private static int mathSizeOfUnsignedVarint(int value) {
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+// return (38 - leadingZeros) / 7 + leadingZeros / 32;
+int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 
0b10010010010010011) >>> 19;
+return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5);
+}
+
+@Test
+public void testSizeOfUnsignedVarintMath() {
+for (int i = 0; i < Integer.MAX_VALUE; i++) {

Review comment:
   Is this too slow for a unit test? Similar question for other tests that 
do a large range of values.

##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
##
@@ -239,6 +239,66 @@ public void testDouble() throws IOException {
 assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L);
 }
 
+private static int mathSizeOfUnsignedVarint(int value) {
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+// return (38 - leadingZeros) / 7 + leadingZeros / 32;
+int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 
0b10010010010010011) >>> 19;
+return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5);
+}
+
+@Test
+public void testSizeOfUnsignedVarintMath() {
+for (int i = 0; i < Integer.MAX_VALUE; i++) {
+final int actual = mathSizeOfUnsignedVarint(i);
+final int expected = oldSizeOfUnsignedVarint(i);
+assertEquals(expected, actual);
+}
+}
+
+/**
+ * The old well-known implementation for sizeOfUnsignedVarint
+ */
+private static int oldSizeOfUnsignedVarint(int value) {

Review comment:
   Maybe we can call this `simpleSizeOfUnsignedVarint` or something like 
that?

##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.utils.ByteUtils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ByteUtilsBenchmark {
+private int input;
+
+@Setup(Level.Iteration)
+public void setUp() {
+input = ThreadLocalRandom.current().nextInt(2 * 1024 * 1024);
+}
+
+@Benchmark
+@Fork(3)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 10, time = 1)
+public int testSizeOfUnsignedVarint() {

Review comment:
   Shall we also add methods for `sizeOfVarlong`?

##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ 

[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-04 Thread GitBox


ijuma commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799462063



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long 
nowMs) {
 boolean exhausted = this.free.queued() > 0;
 for (Map.Entry> entry : 
this.batches.entrySet()) {
 Deque deque = entry.getValue();
+
+final ProducerBatch batch;
+final long waitedTimeMs;
+final boolean backingOff;
+final boolean full;
+
+// Collect as little as possible inside critical region, determine 
outcome after release
 synchronized (deque) {
-// When producing to a large number of partitions, this path 
is hot and deques are often empty.
-// We check whether a batch exists first to avoid the more 
expensive checks whenever possible.

Review comment:
   Why did we remove the information on why this is performance sensitive?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long 
nowMs) {
 boolean exhausted = this.free.queued() > 0;
 for (Map.Entry> entry : 
this.batches.entrySet()) {
 Deque deque = entry.getValue();
+
+final ProducerBatch batch;
+final long waitedTimeMs;
+final boolean backingOff;
+final boolean full;
+
+// Collect as little as possible inside critical region, determine 
outcome after release
 synchronized (deque) {
-// When producing to a large number of partitions, this path 
is hot and deques are often empty.
-// We check whether a batch exists first to avoid the more 
expensive checks whenever possible.
-ProducerBatch batch = deque.peekFirst();
-if (batch != null) {
-TopicPartition part = entry.getKey();
-Node leader = cluster.leaderFor(part);
-if (leader == null) {
-// This is a partition for which leader is not known, 
but messages are available to send.
-// Note that entries are currently not removed from 
batches when deque is empty.
-unknownLeaderTopics.add(part.topic());
-} else if (!readyNodes.contains(leader) && !isMuted(part)) 
{
-long waitedTimeMs = batch.waitedTimeMs(nowMs);
-boolean backingOff = batch.attempts() > 0 && 
waitedTimeMs < retryBackoffMs;
-long timeToWaitMs = backingOff ? retryBackoffMs : 
lingerMs;
-boolean full = deque.size() > 1 || batch.isFull();
-boolean expired = waitedTimeMs >= timeToWaitMs;
-boolean transactionCompleting = transactionManager != 
null && transactionManager.isCompleting();
-boolean sendable = full
-|| expired
-|| exhausted
-|| closed
-|| flushInProgress()
-|| transactionCompleting;
-if (sendable && !backingOff) {
-readyNodes.add(leader);
-} else {
-long timeLeftMs = Math.max(timeToWaitMs - 
waitedTimeMs, 0);
-// Note that this results in a conservative 
estimate since an un-sendable partition may have
-// a leader that will later be found to have 
sendable data. However, this is good enough
-// since we'll just wake up and then sleep again 
for the remaining time.
-nextReadyCheckDelayMs = Math.min(timeLeftMs, 
nextReadyCheckDelayMs);
-}
-}
+batch = deque.peekFirst();

Review comment:
   Out of curiosity, do these make much of a difference? We can probably 
keep them, but it seems like the big problem was the `close` call being inside 
the lock (in the other method).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-04 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487148#comment-17487148
 ] 

Bruno Cadonna commented on KAFKA-13600:
---

I am fine with discussing the improvement on the PR and not in a KIP. I 
actually realized that the other improvements to the assignment algorithm 
included changes to the public API and therefore a KIP was needed. For me it is 
just important that we look really careful at the improvements because the 
assignment algorithm is a quite critical part of the system. 

Additionally, I did not want to discuss about a totally new assignment 
algorithm. I just linked the information for general interest.

Looking forward to the PR.

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Priority: Major
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-04 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487184#comment-17487184
 ] 

Bruno Cadonna commented on KAFKA-13638:
---

[~Lejon] How did you measure time with your test? I cannot really reproduce 
your numbers. 

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-04 Thread Ulrik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487211#comment-17487211
 ] 

Ulrik commented on KAFKA-13638:
---

[~cadonna] i ran the test from within Intellij and just took the numbers from 
Intellij, which display the running time for the test 

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] hachikuji commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-02-04 Thread GitBox


hachikuji commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r799691615



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -514,12 +512,11 @@ public ProducerConfig(Map props) {
 }
 
 boolean idempotenceEnabled() {
-boolean userConfiguredIdempotence = 
this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
 boolean userConfiguredTransactions = 
this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
-boolean idempotenceEnabled = userConfiguredIdempotence && 
this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
-
-if (!idempotenceEnabled && userConfiguredIdempotence && 
userConfiguredTransactions)
+boolean idempotenceEnabled = 
this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+if (!idempotenceEnabled && userConfiguredTransactions)
 throw new ConfigException("Cannot set a " + 
ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
+
 return userConfiguredTransactions || idempotenceEnabled;

Review comment:
   nit: I think the check for `userConfiguredTransactions` is redundant 
now. 

##
File path: docs/upgrade.html
##
@@ -19,6 +19,13 @@
 
 

[GitHub] [kafka] ijuma commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-02-04 Thread GitBox


ijuma commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r799697679



##
File path: docs/upgrade.html
##
@@ -19,6 +19,13 @@
 
 

[GitHub] [kafka] cmccabe commented on pull request #11659: KAFKA-13503: Validate broker configs for KRaft

2022-02-04 Thread GitBox


cmccabe commented on pull request #11659:
URL: https://github.com/apache/kafka/pull/11659#issuecomment-1030241564


   Thanks for the PR, @dengziming.
   
   The broker validation needs to be done on the broker side, since the broker 
has access to information that the controller does not. For example, the broker 
knows whether a given file exists, and so can validate whether it is reasonable 
to set the SSL certificate to that file.
   
   KAFKA-13552 added this validation on the broker side. So I think we should 
close this PR as a duplicate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13503) Validate broker configs for KRaft

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-13503.
--
Fix Version/s: 3.2.0
 Reviewer: Jose Armando Garcia Sancio
 Assignee: Colin McCabe  (was: dengziming)
   Resolution: Fixed

> Validate broker configs for KRaft
> -
>
> Key: KAFKA-13503
> URL: https://issues.apache.org/jira/browse/KAFKA-13503
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 3.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13552) Unable to dynamically change broker log levels on KRaft

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-13552:
-
Fix Version/s: 3.2.0

> Unable to dynamically change broker log levels on KRaft
> ---
>
> Key: KAFKA-13552
> URL: https://issues.apache.org/jira/browse/KAFKA-13552
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ron Dagostino
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.2.0
>
>
> It is currently not possible to dynamically change the log level in KRaft.  
> For example:
> kafka-configs.sh --bootstrap-server  --alter --add-config 
> "kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers 
> --entity-name 0
> Results in:
> org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource 
> type BROKER_LOGGER.
> The code to process this request is in ZkAdminManager.alterLogLevelConfigs(). 
>  This needs to be moved out of there, and the functionality has to be 
> processed locally on the broker instead of being forwarded to the KRaft 
> controller.
> It is also an open question as to how we can dynamically alter log levels for 
> a remote KRaft controller.  Connecting directly to it is one possible 
> solution, but that may not be desirable since generally connecting directly 
> to the controller is not necessary.  The ticket for this particular spect of 
> the issue is https://issues.apache.org/jira/browse/KAFKA-13502



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13503) Validate broker configs for KRaft

2022-02-04 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487221#comment-17487221
 ] 

Colin McCabe commented on KAFKA-13503:
--

The PR for KAFKA-13552 added broker-side validation as well. So I think we can 
close this now.

{code}
commit 68a19539cf1fcd86787960d0010b672d0d611b91
Author: Colin Patrick McCabe 
Date:   Fri Jan 21 16:00:21 2022 -0800

KAFKA-13552: Fix BROKER and BROKER_LOGGER in KRaft (#11657)

{code}


> Validate broker configs for KRaft
> -
>
> Key: KAFKA-13503
> URL: https://issues.apache.org/jira/browse/KAFKA-13503
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: dengziming
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-04 Thread GitBox


guozhangwang commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r799712506



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1034,14 +1037,15 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
 final StreamThread streamThread;
 synchronized (changeThreadCount) {
 final int threadIdx = getNextThreadIndex();
-final int numLiveThreads = getNumLiveStreamThreads();
-final long cacheSizePerThread = 
getCacheSizePerThread(numLiveThreads + 1);
-log.info("Adding StreamThread-{}, there will now be {} live 
threads and the new cache size per thread is {}",
- threadIdx, numLiveThreads + 1, cacheSizePerThread);
-resizeThreadCache(cacheSizePerThread);
 // Creating thread should hold the lock in order to avoid 
duplicate thread index.
 // If the duplicate index happen, the metadata of thread may 
be duplicate too.
-streamThread = createAndAddStreamThread(cacheSizePerThread, 
threadIdx);
+// Also, we create the new thread with initial values of cache 
size and max buffer size as 0
+// and then resize them later
+streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
+final int numLiveThreads = getNumLiveStreamThreads();
+resizeThreadCacheAndBufferMemory(numLiveThreads + 1);

Review comment:
   > I was wondering: why do we need to do this post-construction resizing?
   
   The main motivation is to consolidate the resizing of the thread cache and 
buffer within a single call. More details can be found in this comment thread: 
https://github.com/apache/kafka/pull/11424#discussion_r774058848 I suggested we 
initialize the new thread with 0 value -- should not be a random value? -- and 
then resize (at that time we have the correct number of threads to divide).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9154) ProducerId generation should be managed by the Controller

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-9154.
-
Fix Version/s: 3.1.0
   Resolution: Fixed

We implemented KIP-730 in Kafka 3.1. Closing this.

> ProducerId generation should be managed by the Controller
> -
>
> Key: KAFKA-9154
> URL: https://issues.apache.org/jira/browse/KAFKA-9154
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Viktor Somogyi-Vass
>Assignee: David Arthur
>Priority: Major
>  Labels: kip-500
> Fix For: 3.1.0
>
>
> Currently producerIds are maintained in Zookeeper but in the future we'd like 
> them to be managed by the controller quorum in an internal topic.
> The reason for storing this in Zookeeper was that this must be unique across 
> the cluster. In this task it should be refactored such that the 
> TransactionManager turns to the Controller for a ProducerId which connects to 
> Zookeeper to acquire this ID. Since ZK is the single source of truth and the 
> PID won't be cached anywhere it should be safe (just one extra hop added).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-9837) New RPC for notifying controller of failed replica

2022-02-04 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487226#comment-17487226
 ] 

Colin McCabe commented on KAFKA-9837:
-

I'm sorry that there hasn't been any activity on this one in a while (There was 
more on the PR). The issue here is that we don't want to add an RPC which will 
be O(num_partitions_in_dir). It will just become unworkable as directory sizes 
increase, the same as the old controller RPCs which were like this. So we need 
to start keeping metadata on the controller about which partitions are in which 
directory. However, that would require a KIP which we haven't had time to do 
yet...

> New RPC for notifying controller of failed replica
> --
>
> Key: KAFKA-9837
> URL: https://issues.apache.org/jira/browse/KAFKA-9837
> Project: Kafka
>  Issue Type: New Feature
>  Components: controller, core
>Reporter: David Arthur
>Assignee: dengziming
>Priority: Major
>  Labels: kip-500
> Fix For: 3.2.0
>
>
> This is the tracking ticket for 
> [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
>  For the bridge release, brokers should no longer use ZooKeeper to notify the 
> controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-10724) Command to run single quorum in raft is missing "--config" parameters.

2022-02-04 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487230#comment-17487230
 ] 

Colin McCabe commented on KAFKA-10724:
--

The command has been renamed to {{raft/bin/test-kraft-server-start.sh}}, and it 
does have a {{--config}} flag now:

{code}
raft/bin/test-kraft-server-start.sh
Standalone raft server for performance testing
Option  Description   
--  ---   
--config  Required configured file  
--help  Print usage information.  
--record-size   The size of each record (default: 256)
--throughput  The number of records per second the  
  leader will write to the metadata   
  topic (default: 5000)   
--version   Display Kafka version.
{code}

(To be clear, this is just an internal testing tool, not something that 
end-users should use)

Closing.

> Command to run single quorum in raft is missing "--config" parameters.
> --
>
> Key: KAFKA-10724
> URL: https://issues.apache.org/jira/browse/KAFKA-10724
> Project: Kafka
>  Issue Type: Bug
>  Components: core, docs
>Reporter: huldar chen
>Priority: Major
>  Labels: kip-500
>
> When I run "bin/test-raft-server-start.sh config/raft.properties", I get an 
> error:
> [2020-11-14 23:00:38,742] ERROR Exiting Kafka due to fatal exception 
> (kafka.tools.TestRaftServer$)
> org.apache.kafka.common.config.ConfigException: Missing required 
> configuration "zookeeper.connect" which has no default value.
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:142)
>  at kafka.server.KafkaConfig.(KafkaConfig.scala:1314)
>  at kafka.server.KafkaConfig.(KafkaConfig.scala:1317)
>  at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:607)
>  at kafka.tools.TestRaftServer.main(TestRaftServer.scala)
> The correct command is “ ./bin/test-raft-server-start.sh --config 
> ./config/raft.properties”



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-10724) Command to run single quorum in raft is missing "--config" parameters.

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-10724:


Assignee: Jason Gustafson

> Command to run single quorum in raft is missing "--config" parameters.
> --
>
> Key: KAFKA-10724
> URL: https://issues.apache.org/jira/browse/KAFKA-10724
> Project: Kafka
>  Issue Type: Bug
>  Components: core, docs
>Reporter: huldar chen
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: kip-500
>
> When I run "bin/test-raft-server-start.sh config/raft.properties", I get an 
> error:
> [2020-11-14 23:00:38,742] ERROR Exiting Kafka due to fatal exception 
> (kafka.tools.TestRaftServer$)
> org.apache.kafka.common.config.ConfigException: Missing required 
> configuration "zookeeper.connect" which has no default value.
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:142)
>  at kafka.server.KafkaConfig.(KafkaConfig.scala:1314)
>  at kafka.server.KafkaConfig.(KafkaConfig.scala:1317)
>  at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:607)
>  at kafka.tools.TestRaftServer.main(TestRaftServer.scala)
> The correct command is “ ./bin/test-raft-server-start.sh --config 
> ./config/raft.properties”



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-10724) Command to run single quorum in raft is missing "--config" parameters.

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-10724.
--
Resolution: Fixed

> Command to run single quorum in raft is missing "--config" parameters.
> --
>
> Key: KAFKA-10724
> URL: https://issues.apache.org/jira/browse/KAFKA-10724
> Project: Kafka
>  Issue Type: Bug
>  Components: core, docs
>Reporter: huldar chen
>Priority: Major
>  Labels: kip-500
>
> When I run "bin/test-raft-server-start.sh config/raft.properties", I get an 
> error:
> [2020-11-14 23:00:38,742] ERROR Exiting Kafka due to fatal exception 
> (kafka.tools.TestRaftServer$)
> org.apache.kafka.common.config.ConfigException: Missing required 
> configuration "zookeeper.connect" which has no default value.
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:142)
>  at kafka.server.KafkaConfig.(KafkaConfig.scala:1314)
>  at kafka.server.KafkaConfig.(KafkaConfig.scala:1317)
>  at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:607)
>  at kafka.tools.TestRaftServer.main(TestRaftServer.scala)
> The correct command is “ ./bin/test-raft-server-start.sh --config 
> ./config/raft.properties”



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12209) Add the timeline data structures for the KIP-631 controller

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12209.
--
Fix Version/s: 2.8.0
 Assignee: Colin McCabe
   Resolution: Fixed

> Add the timeline data structures for the KIP-631 controller
> ---
>
> Key: KAFKA-12209
> URL: https://issues.apache.org/jira/browse/KAFKA-12209
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 2.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12214) Generated code does not include UUID or struct fields in its toString output

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12214.
--
Resolution: Fixed

> Generated code does not include UUID or struct fields in its toString output
> 
>
> Key: KAFKA-12214
> URL: https://issues.apache.org/jira/browse/KAFKA-12214
> Project: Kafka
>  Issue Type: Bug
>  Components: generator
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
>
> The generated code does not include UUID or struct fields in its toString 
> output.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12271) Expose consistent Raft metadata to components

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12271.
--
Fix Version/s: 3.0.0
   Resolution: Fixed

> Expose consistent Raft metadata to components
> -
>
> Key: KAFKA-12271
> URL: https://issues.apache.org/jira/browse/KAFKA-12271
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Arthur
>Assignee: Colin McCabe
>Priority: Minor
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> In Raft clusters, we need a way of exposing consistent metadata to components 
> such as ReplicaManager, LogManager, etc. 
> This is done through a new class named MetadataImage. As Raft metadata 
> records are processed, new MetadataImage-s are built and provided to the 
> MetadataCache atomically. This avoids readers seeing any partially 
> materialized metadata.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12421) Improve controller's atomic grouping

2022-02-04 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487233#comment-17487233
 ] 

Colin McCabe commented on KAFKA-12421:
--

We did introduce a distinction between atomic and non-atomic writes, so let's 
close this for now.

> Improve controller's atomic grouping
> 
>
> Key: KAFKA-12421
> URL: https://issues.apache.org/jira/browse/KAFKA-12421
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: José Armando García Sancio
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: kip-500
>
> The current controller implementation atomically appends to the metadata log 
> by making sure that all required records are on the same batch. The 
> controller groups all of the records that result from an RPC into one batch. 
> Some of the RPCs are:
>  # Client quota changes
>  # Configuration changes
>  # Feature changes
>  # Topic creation
> This is good enough for correctness but it is more aggressive than necessary. 
> For example, for topic creation since errors are reported independently, the 
> controller only needs to guarantee that all of the records for one topic are 
> committed atomically.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12421) Improve controller's atomic grouping

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12421.
--
  Assignee: Jose Armando Garcia Sancio  (was: HaiyuanZhao)
Resolution: Fixed

> Improve controller's atomic grouping
> 
>
> Key: KAFKA-12421
> URL: https://issues.apache.org/jira/browse/KAFKA-12421
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: José Armando García Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
>
> The current controller implementation atomically appends to the metadata log 
> by making sure that all required records are on the same batch. The 
> controller groups all of the records that result from an RPC into one batch. 
> Some of the RPCs are:
>  # Client quota changes
>  # Configuration changes
>  # Feature changes
>  # Topic creation
> This is good enough for correctness but it is more aggressive than necessary. 
> For example, for topic creation since errors are reported independently, the 
> controller only needs to guarantee that all of the records for one topic are 
> committed atomically.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-12502) Quorum controller should return topic configs in CreateTopic response

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-12502:


Assignee: Colin McCabe  (was: Ryan Dielhenn)

> Quorum controller should return topic configs in CreateTopic response
> -
>
> Key: KAFKA-12502
> URL: https://issues.apache.org/jira/browse/KAFKA-12502
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
>
> Configs were added to the response in version 5. 
> {code}
>   { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": 
> "5+", "nullableVersions": "5+", "ignorable": true,
> "about": "Configuration of the topic.", "fields": [
> { "name": "Name", "type": "string", "versions": "5+",
>   "about": "The configuration name." },
> { "name": "Value", "type": "string", "versions": "5+", 
> "nullableVersions": "5+",
>   "about": "The configuration value." },
> { "name": "ReadOnly", "type": "bool", "versions": "5+",
>   "about": "True if the configuration is read-only." },
> { "name": "ConfigSource", "type": "int8", "versions": "5+", 
> "default": "-1", "ignorable": true,
>   "about": "The configuration source." },
> { "name": "IsSensitive", "type": "bool", "versions": "5+",
>   "about": "True if this configuration is sensitive." }
>   ]}
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13646) Implement KIP-801: KRaft authorizer

2022-02-04 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13646:


 Summary: Implement KIP-801: KRaft authorizer
 Key: KAFKA-13646
 URL: https://issues.apache.org/jira/browse/KAFKA-13646
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13193) Replica manager doesn't update partition state when transitioning from leader to follower with unknown leader

2022-02-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-13193.
--
Resolution: Fixed

> Replica manager doesn't update partition state when transitioning from leader 
> to follower with unknown leader
> -
>
> Key: KAFKA-13193
> URL: https://issues.apache.org/jira/browse/KAFKA-13193
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
>
> This issue applies to both the ZK and KRaft implementation of the replica 
> manager. In the rare case when a replica transition from leader to follower 
> with no leader the partition state is not updated.
> This is because when handling makeFollowers the ReplicaManager only updates 
> the partition state if the leader is alive. The solution is to always 
> transition to follower but not start the fetcher thread if the leader is 
> unknown or not alive.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jasonk000 commented on pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-04 Thread GitBox


jasonk000 commented on pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#issuecomment-1030278393


   Thanks @ijuma , I believe I've sorted these out in 
[e891ebf](https://github.com/apache/kafka/pull/11721/commits/e891ebfd46ed97ab1c6face21f3d7f6565734a77).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-04 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799747533



##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
##
@@ -239,6 +239,66 @@ public void testDouble() throws IOException {
 assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L);
 }
 
+private static int mathSizeOfUnsignedVarint(int value) {
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+// return (38 - leadingZeros) / 7 + leadingZeros / 32;
+int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 
0b10010010010010011) >>> 19;
+return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5);
+}
+
+@Test
+public void testSizeOfUnsignedVarintMath() {
+for (int i = 0; i < Integer.MAX_VALUE; i++) {

Review comment:
   The tests I have now run in ~0.6 seconds.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-04 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799748066



##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
##
@@ -239,6 +239,66 @@ public void testDouble() throws IOException {
 assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L);
 }
 
+private static int mathSizeOfUnsignedVarint(int value) {
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+// return (38 - leadingZeros) / 7 + leadingZeros / 32;
+int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 
0b10010010010010011) >>> 19;
+return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5);
+}
+
+@Test
+public void testSizeOfUnsignedVarintMath() {
+for (int i = 0; i < Integer.MAX_VALUE; i++) {
+final int actual = mathSizeOfUnsignedVarint(i);
+final int expected = oldSizeOfUnsignedVarint(i);
+assertEquals(expected, actual);
+}
+}
+
+/**
+ * The old well-known implementation for sizeOfUnsignedVarint
+ */
+private static int oldSizeOfUnsignedVarint(int value) {

Review comment:
   I nested these inside the test case, which I think is cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11543: Update release.py

2022-02-04 Thread GitBox


dajac commented on pull request #11543:
URL: https://github.com/apache/kafka/pull/11543#issuecomment-1030295682


   @shharrnam Thanks for the PR. Could you explain why we would need to add 
this new line? That does not seem necessary to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac closed pull request #11594: How to define the quantity of consumption groups

2022-02-04 Thread GitBox


dajac closed pull request #11594:
URL: https://github.com/apache/kafka/pull/11594


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11594: How to define the quantity of consumption groups

2022-02-04 Thread GitBox


dajac commented on pull request #11594:
URL: https://github.com/apache/kafka/pull/11594#issuecomment-1030299168


   @ayu-programer It depends on your use case. You could consume all the topics 
from the same group if the application needs to process all of them together. 
Otherwise, you can define a group per use case for instance and only subscribe 
to the required topics for that use case. I hope it helps.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11353: KAFKA-13322: Reducing amount of garbage that gets generated during a poll operation

2022-02-04 Thread GitBox


dajac commented on pull request #11353:
URL: https://github.com/apache/kafka/pull/11353#issuecomment-1030306522


   @mprusakov Are you still interested by doing this? I can help reviewing the 
PR once the existing comments are addressed. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #10562: MINOR: Update tests to include the 2.8.0 release

2022-02-04 Thread GitBox


dajac commented on pull request #10562:
URL: https://github.com/apache/kafka/pull/10562#issuecomment-1030315113


   Closing as this was done by another PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac closed pull request #10562: MINOR: Update tests to include the 2.8.0 release

2022-02-04 Thread GitBox


dajac closed pull request #10562:
URL: https://github.com/apache/kafka/pull/10562


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9963: MINOR: Extract ApiVersions logic from the `SocketServer` to the `KafkaApis`

2022-02-04 Thread GitBox


dajac commented on pull request #9963:
URL: https://github.com/apache/kafka/pull/9963#issuecomment-1030320056


   Closing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac closed pull request #9963: MINOR: Extract ApiVersions logic from the `SocketServer` to the `KafkaApis`

2022-02-04 Thread GitBox


dajac closed pull request #9963:
URL: https://github.com/apache/kafka/pull/9963


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac closed pull request #9402: KAFKA-10588 update console consumer arguments for KIP-629

2022-02-04 Thread GitBox


dajac closed pull request #9402:
URL: https://github.com/apache/kafka/pull/9402


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9402: KAFKA-10588 update console consumer arguments for KIP-629

2022-02-04 Thread GitBox


dajac commented on pull request #9402:
URL: https://github.com/apache/kafka/pull/9402#issuecomment-1030324045


   This was done by https://github.com/apache/kafka/pull/11008. Closing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9289: Dev/pasriva/json acl

2022-02-04 Thread GitBox


dajac commented on pull request #9289:
URL: https://github.com/apache/kafka/pull/9289#issuecomment-1030325227


   I am sure what this PR is doing. It seems to be a mistake. Closing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac closed pull request #9289: Dev/pasriva/json acl

2022-02-04 Thread GitBox


dajac closed pull request #9289:
URL: https://github.com/apache/kafka/pull/9289


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac closed pull request #9085: MINOR: Support java.util.Optional in the auto-generated protocol

2022-02-04 Thread GitBox


dajac closed pull request #9085:
URL: https://github.com/apache/kafka/pull/9085


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9085: MINOR: Support java.util.Optional in the auto-generated protocol

2022-02-04 Thread GitBox


dajac commented on pull request #9085:
URL: https://github.com/apache/kafka/pull/9085#issuecomment-1030326392


   PR is outdated. Closing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac closed pull request #7456: KAFKA-8997; Make Errors a first class type in the auto-generated protocol.

2022-02-04 Thread GitBox


dajac closed pull request #7456:
URL: https://github.com/apache/kafka/pull/7456


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-04 Thread GitBox


jasonk000 commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799796771



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long 
nowMs) {
 boolean exhausted = this.free.queued() > 0;
 for (Map.Entry> entry : 
this.batches.entrySet()) {
 Deque deque = entry.getValue();
+
+final ProducerBatch batch;
+final long waitedTimeMs;
+final boolean backingOff;
+final boolean full;
+
+// Collect as little as possible inside critical region, determine 
outcome after release
 synchronized (deque) {
-// When producing to a large number of partitions, this path 
is hot and deques are often empty.
-// We check whether a batch exists first to avoid the more 
expensive checks whenever possible.

Review comment:
   I attempted to capture the sentiment with this comment, given the 
re-flowed logic:
   ```
   // Collect as little as possible inside critical region, determine outcome 
after release
   ```
   I'll add something more descriptive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mprusakov closed pull request #11353: KAFKA-13322: Reducing amount of garbage that gets generated during a poll operation

2022-02-04 Thread GitBox


mprusakov closed pull request #11353:
URL: https://github.com/apache/kafka/pull/11353


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mprusakov commented on pull request #11353: KAFKA-13322: Reducing amount of garbage that gets generated during a poll operation

2022-02-04 Thread GitBox


mprusakov commented on pull request #11353:
URL: https://github.com/apache/kafka/pull/11353#issuecomment-1030340572


   Unfortunately i have realized that this fix is a drop in the ocean and 
further changes are much more intrusive. This change alone did not meet my 
client's requirements so I've implemented my own version using your lower level 
api thus no need in this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13322) Java client produces a large amount of garbage during a poll

2022-02-04 Thread Michail (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michail resolved KAFKA-13322.
-
Resolution: Won't Fix

Having done the analysis the changes required to achieve this are far too 
intrusive thus decided not to proceed with this.

> Java client produces a large amount of garbage during a poll
> 
>
> Key: KAFKA-13322
> URL: https://issues.apache.org/jira/browse/KAFKA-13322
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Michail
>Priority: Minor
>  Labels: new-rebalance-should-fix
>
> The java kafka consumer creates multiple collections during a single poll 
> command: in my test system i have a consumer that polls a topic with 100 
> partitions and even though no messages are coming through, the code allocates 
> around 100M per 5 minutes.
>  
> I've investigated the allocations and the biggest ones can be easily avoided 
> by moving them to the instance level, something that can be done as 
> KafkaConsumer is not thread safe. Purpose of this Jira is to get rid of most 
> of them applying either this or a similar approach.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] hachikuji commented on a change in pull request #11667: MINOR; Enable Kraft in ApiVersionTest

2022-02-04 Thread GitBox


hachikuji commented on a change in pull request #11667:
URL: https://github.com/apache/kafka/pull/11667#discussion_r799823851



##
File path: core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
##
@@ -17,41 +17,35 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.ClusterTest
-import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 
-@ExtendWith(value = Array(classOf[ClusterTestExtensions]))

Review comment:
   I think `ClusterTestExtensions` was intended to test both modes. Was it 
not working? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #11685: Update dependencies.gradle

2022-02-04 Thread GitBox


hachikuji commented on pull request #11685:
URL: https://github.com/apache/kafka/pull/11685#issuecomment-1030368363


   @shubhamsingh002 Can you clarify your intent here? The upgrade to log4j 2 is 
not compatible and we are not planning to release from 0.10 in any case as far 
as I know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-04 Thread GitBox


ijuma commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799828526



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long 
nowMs) {
 boolean exhausted = this.free.queued() > 0;
 for (Map.Entry> entry : 
this.batches.entrySet()) {
 Deque deque = entry.getValue();
+
+final ProducerBatch batch;
+final long waitedTimeMs;
+final boolean backingOff;
+final boolean full;
+
+// Collect as little as possible inside critical region, determine 
outcome after release
 synchronized (deque) {
-// When producing to a large number of partitions, this path 
is hot and deques are often empty.
-// We check whether a batch exists first to avoid the more 
expensive checks whenever possible.

Review comment:
   I think the useful context that was removed is:
   
   > When producing to a large number of partitions, this path is hot and 
deques are often empty.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #11646: KAFKA-13566: producer exponential backoff implementation for KIP-580

2022-02-04 Thread GitBox


hachikuji commented on pull request #11646:
URL: https://github.com/apache/kafka/pull/11646#issuecomment-1030373051


   @showuon Thanks for the PR and apologies for the delay. It looks to me like 
this is covering both the consumer and producer? Would it be possible to 
separate into two PRs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji closed pull request #11685: Update dependencies.gradle

2022-02-04 Thread GitBox


hachikuji closed pull request #11685:
URL: https://github.com/apache/kafka/pull/11685


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #11685: Update dependencies.gradle

2022-02-04 Thread GitBox


hachikuji commented on pull request #11685:
URL: https://github.com/apache/kafka/pull/11685#issuecomment-1030374498


   I am going to go ahead and close this. Please feel free to reopen after you 
have updated the description to explain what you are trying to do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11620: MINOR: check for raft threads in verifyNoUnexpectedThreads

2022-02-04 Thread GitBox


hachikuji commented on a change in pull request #11620:
URL: https://github.com/apache/kafka/pull/11620#discussion_r799835234



##
File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
##
@@ -340,6 +340,7 @@ abstract class QuorumTestHarness extends Logging {
 
 object QuorumTestHarness {
   val ZkClientEventThreadSuffix = "-EventThread"
+  val RaftClientThreadPrefix = "raft"

Review comment:
   Was there a plan to use this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-04 Thread GitBox


ijuma commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799833387



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -413,15 +424,20 @@ public static int sizeOfVarint(int value) {
  * Number of bytes needed to encode a long in variable-length format.
  *
  * @param value The signed value
+ * @see #sizeOfUnsignedVarint(int)
  */
 public static int sizeOfVarlong(long value) {
 long v = (value << 1) ^ (value >> 63);
-int bytes = 1;
-while ((v & 0xff80L) != 0L) {
-bytes += 1;
-v >>>= 7;
-}
-return bytes;
+
+// For implementation notes @see #sizeOfUnsignedVarint(int)
+
+// Similar logic is applied to allow for 64bit input -> 1-9byte output.
+
+// return (70 - leadingZeros) / 7 + leadingZeros / 64;

Review comment:
   Nit: not sure we need the empty lines between comments here, seems 
pretty readable without it.

##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.utils.ByteUtils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ByteUtilsBenchmark {
+private int input;
+
+@Setup(Level.Iteration)
+public void setUp() {
+input = ThreadLocalRandom.current().nextInt(2 * 1024 * 1024);
+}
+
+@Benchmark
+@Fork(3)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 10, time = 1)
+public int testSizeOfUnsignedVarint() {
+return ByteUtils.sizeOfUnsignedVarint(input);
+}
+
+@Benchmark
+@Fork(3)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 10, time = 1)
+public int testSizeOfUnsignedVarintSimple() {
+int value = input;
+int bytes = 1;
+while ((value & 0xff80) != 0L) {
+bytes += 1;
+value >>>= 7;
+}
+return bytes;
+}
+
+@Benchmark
+@Fork(3)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 10, time = 1)
+public int testSizeOfVarlong() {

Review comment:
   I was going to ask why we're using the `test` prefix for a benchmark, 
but then I realized that many of the kafka benchmarks do that and I somehow 
didn't notice. :) Given that, it seems fine to leave it like this for now.

##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.

[GitHub] [kafka] ijuma commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-04 Thread GitBox


ijuma commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r799833387



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -413,15 +424,20 @@ public static int sizeOfVarint(int value) {
  * Number of bytes needed to encode a long in variable-length format.
  *
  * @param value The signed value
+ * @see #sizeOfUnsignedVarint(int)
  */
 public static int sizeOfVarlong(long value) {
 long v = (value << 1) ^ (value >> 63);
-int bytes = 1;
-while ((v & 0xff80L) != 0L) {
-bytes += 1;
-v >>>= 7;
-}
-return bytes;
+
+// For implementation notes @see #sizeOfUnsignedVarint(int)
+
+// Similar logic is applied to allow for 64bit input -> 1-9byte output.
+
+// return (70 - leadingZeros) / 7 + leadingZeros / 64;

Review comment:
   Nit: not sure we need the empty lines between comments here, seems 
pretty readable without them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

2022-02-04 Thread GitBox


hachikuji commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r799847085



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -696,8 +698,12 @@ public void handle(JoinGroupResponse joinResponse, 
RequestFuture fut
 private RequestFuture onJoinLeader(JoinGroupResponse 
joinResponse) {
 try {
 // perform the leader synchronization and send back the assignment 
for the group
-Map groupAssignment = 
performAssignment(joinResponse.data().leader(), 
joinResponse.data().protocolName(),
-joinResponse.data().members());
+Map groupAssignment = performAssignment(
+joinResponse.data().leader(),
+joinResponse.data().protocolName(),
+joinResponse.data().members(),
+joinResponse.data().skipAssignment()

Review comment:
   I think a comment about this would be helpful. An obvious question is 
why do we still call `performAssignment` when `skipAssignment` is set. It's 
useful to remember that we still need to propagate the leader and member state 
to the coordinator implementation.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##
@@ -211,7 +211,15 @@ protected void onJoinComplete(int generation, String 
memberId, String protocol,
 }
 
 @Override
-protected Map performAssignment(String leaderId, 
String protocol, List allMemberMetadata) {
+protected Map performAssignment(String leaderId,
+String protocol,
+
List allMemberMetadata,
+Boolean 
skipAssignment) {
+// Connect does not support static membership so skipping the
+// assignment should never happen in practice.
+if (skipAssignment)
+return Collections.emptyMap();

Review comment:
   Would it make sense to raise an exception instead?

##
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##
@@ -1791,4 +1793,33 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertTrue(records2.count() == 1 && 
records2.records(tp).asScala.head.offset == 1,
   "Expected consumer2 to consume one message from offset 1, which is the 
committed offset of consumer1")
   }
+  @Test

Review comment:
   nit: add newline

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##
@@ -1550,6 +1573,34 @@ public void testMetadataChangeTriggersRebalance() {
 assertTrue(coordinator.rejoinNeededOrPending());
 }
 
+@Test
+public void testStaticLeaderRejoinsGroupAndCanTriggersRebalance() {
+// ensure metadata is up-to-date for leader
+subscriptions.subscribe(singleton(topic1), rebalanceListener);
+client.updateMetadata(metadataResponse);
+
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+// the leader is responsible for picking up metadata changes and 
forcing a group rebalance.
+// note that `partitionAssignor.prepare` is not called therefore 
calling `partitionAssignor.assign`
+// will throw a IllegalStateException. this indirectly verifies that 
`assign` is correctly skipped.
+Map> memberSubscriptions = 
singletonMap(consumerId, singletonList(topic1));

Review comment:
   Could we have a case where the other consumers are subscribed to a topic 
that this consumer is not also subscribed to?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -642,6 +648,12 @@ private void maybeUpdateGroupSubscription(String 
assignorName,
 updateGroupSubscription(allSubscribedTopics);
 
 isLeader = true;
+assignmentSnapshot = metadataSnapshot;
+
+if (skipAssignment)
+return Collections.emptyMap();

Review comment:
   Can we add some logging for this case?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -198,11 +198,13 @@ public AbstractCoordinator(GroupRebalanceConfig 
rebalanceConfig,
  * @param leaderId The id of the leader (which is this member)
  * @param protocol The protocol selected by the coordinator
  * @param allMemberMetadata Metadata from all members of the group
+ * @param skipAssignment True if leader must skip running the assignor
  * @return A map from each member to their state assignment
  */
 protected abstract Map performAssignment(String 
leaderId,

[GitHub] [kafka] lbradstreet commented on a change in pull request #11620: MINOR: check for raft threads in verifyNoUnexpectedThreads

2022-02-04 Thread GitBox


lbradstreet commented on a change in pull request #11620:
URL: https://github.com/apache/kafka/pull/11620#discussion_r799864255



##
File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
##
@@ -340,6 +340,7 @@ abstract class QuorumTestHarness extends Logging {
 
 object QuorumTestHarness {
   val ZkClientEventThreadSuffix = "-EventThread"
+  val RaftClientThreadPrefix = "raft"

Review comment:
   I think I extracted this from another PR where I made a more substantial 
change. I'll figure out what I had in mind.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #11734: MINOR: Do not use optional args in `ProducerStateManager`

2022-02-04 Thread GitBox


hachikuji opened a new pull request #11734:
URL: https://github.com/apache/kafka/pull/11734


   We allowed `maxProducerIdExpirationMs` and `time` to be optional in the 
`ProducerStateManager` constructor. We generally frown on optional arguments 
since it is too easy to overlook them. In this case, I thought it was 
especially dangerous because `maxTransactionTimeoutMs` used the same type as 
`maxProducerIdExpirationMs`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11694: MINOR: deleteHorizonMs update to documentation and DumpLogSegments tool

2022-02-04 Thread GitBox


hachikuji merged pull request #11694:
URL: https://github.com/apache/kafka/pull/11694


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-04 Thread GitBox


jasonk000 commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799904384



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long 
nowMs) {
 boolean exhausted = this.free.queued() > 0;
 for (Map.Entry> entry : 
this.batches.entrySet()) {
 Deque deque = entry.getValue();
+
+final ProducerBatch batch;
+final long waitedTimeMs;
+final boolean backingOff;
+final boolean full;
+
+// Collect as little as possible inside critical region, determine 
outcome after release
 synchronized (deque) {
-// When producing to a large number of partitions, this path 
is hot and deques are often empty.
-// We check whether a batch exists first to avoid the more 
expensive checks whenever possible.
-ProducerBatch batch = deque.peekFirst();
-if (batch != null) {
-TopicPartition part = entry.getKey();
-Node leader = cluster.leaderFor(part);
-if (leader == null) {
-// This is a partition for which leader is not known, 
but messages are available to send.
-// Note that entries are currently not removed from 
batches when deque is empty.
-unknownLeaderTopics.add(part.topic());
-} else if (!readyNodes.contains(leader) && !isMuted(part)) 
{
-long waitedTimeMs = batch.waitedTimeMs(nowMs);
-boolean backingOff = batch.attempts() > 0 && 
waitedTimeMs < retryBackoffMs;
-long timeToWaitMs = backingOff ? retryBackoffMs : 
lingerMs;
-boolean full = deque.size() > 1 || batch.isFull();
-boolean expired = waitedTimeMs >= timeToWaitMs;
-boolean transactionCompleting = transactionManager != 
null && transactionManager.isCompleting();
-boolean sendable = full
-|| expired
-|| exhausted
-|| closed
-|| flushInProgress()
-|| transactionCompleting;
-if (sendable && !backingOff) {
-readyNodes.add(leader);
-} else {
-long timeLeftMs = Math.max(timeToWaitMs - 
waitedTimeMs, 0);
-// Note that this results in a conservative 
estimate since an un-sendable partition may have
-// a leader that will later be found to have 
sendable data. However, this is good enough
-// since we'll just wake up and then sleep again 
for the remaining time.
-nextReadyCheckDelayMs = Math.min(timeLeftMs, 
nextReadyCheckDelayMs);
-}
-}
+batch = deque.peekFirst();

Review comment:
   It is a small amount.
   
   The two changes (#11721) reduce CPU of the `send()` path by about 10% on a 
busy app, so the impact depends on the rest of the application.
   
   For this application, it is producing a high events/sec, so about 1.5-2% of 
overall system CPU is saved. For a 1000-machine cluster it will end up at 
~15-20 machines.
   
   Before
   
   
![image](https://user-images.githubusercontent.com/3196528/152620243-c93b69c7-390f-4ac3-a875-6536c255fc5d.png)
   
   
   After
   
   
![image](https://user-images.githubusercontent.com/3196528/152620252-e0742294-c08e-4b43-8ba0-221ced260f4a.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-04 Thread GitBox


jasonk000 commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r799905219



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long 
nowMs) {
 boolean exhausted = this.free.queued() > 0;
 for (Map.Entry> entry : 
this.batches.entrySet()) {
 Deque deque = entry.getValue();
+
+final ProducerBatch batch;
+final long waitedTimeMs;
+final boolean backingOff;
+final boolean full;
+
+// Collect as little as possible inside critical region, determine 
outcome after release
 synchronized (deque) {
-// When producing to a large number of partitions, this path 
is hot and deques are often empty.
-// We check whether a batch exists first to avoid the more 
expensive checks whenever possible.

Review comment:
   
[f15fdef](https://github.com/apache/kafka/pull/11722/commits/f15fdef3241a17f86af44c6dac04ba651d629426)
   
   Added:
   
   ```
   // This loop is especially hot with large partition counts.
   
   // We are careful to only perform the minimum required inside the
   // synchronized block, as this lock is also used to synchronize producer 
threads
   // attempting to append() to a partition/batch.
   
   synchronized (deque) {
   // Deques are often empty in this path, esp with large partition counts,
   // so we exit early if we can.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2022-02-04 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12984:
---
Fix Version/s: 2.5.2
   2.6.2
   2.7.1

> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---
>
> Key: KAFKA-12984
> URL: https://issues.apache.org/jira/browse/KAFKA-12984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.5.2, 2.7.1, 2.6.2, 2.8.1, 3.0.0
>
> Attachments: image-2021-10-25-11-53-40-221.png, 
> log-events-viewer-result-kafka.numbers, logs-insights-results-kafka.csv, 
> logs-insights-results-kafka.numbers
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2022-02-04 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12984:
---
Fix Version/s: 2.7.2
   2.6.3
   (was: 2.7.1)
   (was: 2.6.2)

> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---
>
> Key: KAFKA-12984
> URL: https://issues.apache.org/jira/browse/KAFKA-12984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.5.2, 2.6.3, 2.7.2, 2.8.1, 3.0.0
>
> Attachments: image-2021-10-25-11-53-40-221.png, 
> log-events-viewer-result-kafka.numbers, logs-insights-results-kafka.csv, 
> logs-insights-results-kafka.numbers
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ableegoldman commented on a change in pull request #11712: WIP: Put failed tasks to end of processing list

2022-02-04 Thread GitBox


ableegoldman commented on a change in pull request #11712:
URL: https://github.com/apache/kafka/pull/11712#discussion_r799919908



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
 return readOnlyActiveTasks;
 }
 
+List orderedActiveTasks() {
+return Collections.unmodifiableList(orderedActiveTasks);
+}
+
+void moveActiveTasksToTailFor(final String topologyName) {

Review comment:
   I told Bruno to do this -- we can discuss next week if you still have 
questions, but the motivation is to avoid letting the tasks of a topology/query 
get way out of sync with each other. This can result in missed output for 
example when a processor upstream of one side of a join is paused for a long 
time while the other continues proccessing records and trying to join them 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-04 Thread GitBox


jasonk000 commented on pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#issuecomment-1030489854


   I've addressed comments, I'll wait and see what spotbugs says this time. 
Locally, it's all clear, on CI it shows up issues with code not related to this 
PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13346) Kafka Streams fails due to RocksDB Locks Not Available Exception

2022-02-04 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13346.
---
Resolution: Not A Problem

> Kafka Streams fails due to RocksDB Locks Not Available Exception
> 
>
> Key: KAFKA-13346
> URL: https://issues.apache.org/jira/browse/KAFKA-13346
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Amit Gupta
>Priority: Major
>
> Hello,
> We are using Kafka Streams and we observe that some times on some of the 
> hosts running streams application, Kafka streams instance fails with 
> unexpected exception. We are running with 40 stream threads per host and 20 
> hosts in total.
> Can some one please help on what can be the root cause here?
>  
> |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> state-store at location .
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:199)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:76)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:95)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:426)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:660)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  ~[kafka-streams-2.6.0.jar:?]
>  Caused by: org.rocksdb.RocksDBException: lock : 
> ./0_468/rocksdb/state-store/LOCK: No locks available
>  at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.18.3.jar:?]
>  at org.rocksdb.RocksDB.open(RocksDB.java:286) ~[rocksdbjni-5.18.3.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:211)
>  ~[kafka-streams-2.6.0.jar:?]
>  ... 15 more
>   
>  Some times I also see this exception
>   |
> |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> state-store at location ./0_433/rocksdb/state-store
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  

[jira] [Commented] (KAFKA-13346) Kafka Streams fails due to RocksDB Locks Not Available Exception

2022-02-04 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487405#comment-17487405
 ] 

Guozhang Wang commented on KAFKA-13346:
---

I'm closing this ticket for now since we have not heard back from Amit. Please 
feel free to re-open it if there are new updates.

> Kafka Streams fails due to RocksDB Locks Not Available Exception
> 
>
> Key: KAFKA-13346
> URL: https://issues.apache.org/jira/browse/KAFKA-13346
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Amit Gupta
>Priority: Major
>
> Hello,
> We are using Kafka Streams and we observe that some times on some of the 
> hosts running streams application, Kafka streams instance fails with 
> unexpected exception. We are running with 40 stream threads per host and 20 
> hosts in total.
> Can some one please help on what can be the root cause here?
>  
> |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> state-store at location .
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:199)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:76)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:95)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:426)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:660)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  ~[kafka-streams-2.6.0.jar:?]
>  Caused by: org.rocksdb.RocksDBException: lock : 
> ./0_468/rocksdb/state-store/LOCK: No locks available
>  at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.18.3.jar:?]
>  at org.rocksdb.RocksDB.open(RocksDB.java:286) ~[rocksdbjni-5.18.3.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:211)
>  ~[kafka-streams-2.6.0.jar:?]
>  ... 15 more
>   
>  Some times I also see this exception
>   |
> |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> state-store at location ./0_433/rocksdb/state-store
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)
>  ~[kafk

[GitHub] [kafka] Kvicii commented on pull request #11543: Update release.py

2022-02-04 Thread GitBox


Kvicii commented on pull request #11543:
URL: https://github.com/apache/kafka/pull/11543#issuecomment-1030526684


   @dajac 
   This is unnecessary, I think this PR can be closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11586: KAFKA-13516: Connection level metrics are not closed

2022-02-04 Thread GitBox


ijuma commented on pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#issuecomment-1030531283


   Cc @apovzner @splett2 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11579: KAFKA-13518: Update gson dependency

2022-02-04 Thread GitBox


ijuma commented on pull request #11579:
URL: https://github.com/apache/kafka/pull/11579#issuecomment-1030531381


   Thanks for the PR. Seems like the new version has more false positives. Do 
you know if they intend to fix those?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma closed pull request #11543: Update release.py

2022-02-04 Thread GitBox


ijuma closed pull request #11543:
URL: https://github.com/apache/kafka/pull/11543


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11469: MINOR: disable zookeeper.sasl.client to avoid false error

2022-02-04 Thread GitBox


ijuma commented on pull request #11469:
URL: https://github.com/apache/kafka/pull/11469#issuecomment-1030532433


   Any reason why this wasn't merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma closed pull request #11377: MINOR: Use try-with-resource to close the stream opened by Files.list()

2022-02-04 Thread GitBox


ijuma closed pull request #11377:
URL: https://github.com/apache/kafka/pull/11377


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

2022-02-04 Thread GitBox


ijuma commented on pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#issuecomment-1030532699


   Are you planning to address the comments @lbradstreet ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11137: KAFKA-13133 Replace EasyMock and PowerMock with Mockito for AbstractHerderTest

2022-02-04 Thread GitBox


ijuma commented on pull request #11137:
URL: https://github.com/apache/kafka/pull/11137#issuecomment-1030532879


   @wycc are you planning to address the comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11159: MINOR: Fix logging in ClusterControlManager

2022-02-04 Thread GitBox


ijuma commented on pull request #11159:
URL: https://github.com/apache/kafka/pull/11159#issuecomment-1030533071


   @jsancio are you going to merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11161: MINOR: Remove node from API versions cache on NetworkClient.close(nodeId)

2022-02-04 Thread GitBox


ijuma commented on pull request #11161:
URL: https://github.com/apache/kafka/pull/11161#issuecomment-1030533145


   @rajinisivaram friendly ping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11204: KAFKA-13188 - Release the memory back into MemoryPool

2022-02-04 Thread GitBox


ijuma commented on a change in pull request #11204:
URL: https://github.com/apache/kafka/pull/11204#discussion_r78166



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClientResponseWithFinalize.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * This is a decorator for ClientResponse used to verify (at finalization 
time) that any underlying memory for this
+ * response has been returned to the pool. To be used only as a debugging aide.
+ */
+public class ClientResponseWithFinalize extends ClientResponse {
+private final LogContext logContext;
+
+public ClientResponseWithFinalize(RequestHeader requestHeader, 
RequestCompletionHandler callback,
+String destination, long createdTimeMs, long receivedTimeMs, boolean 
disconnected,
+UnsupportedVersionException versionMismatch, AuthenticationException 
authenticationException,
+AbstractResponse responseBody, MemoryPool memoryPool, ByteBuffer 
responsePayload, LogContext logContext) {
+super(requestHeader, callback, destination, createdTimeMs, 
receivedTimeMs, disconnected, versionMismatch,
+authenticationException, responseBody, memoryPool, 
responsePayload);
+this.logContext = logContext;
+}
+
+private Logger getLogger() {
+if (logContext != null) {
+return logContext.logger(ClientResponseWithFinalize.class);
+}
+return log;
+}
+
+protected void checkAndForceBufferRelease() {
+if (memoryPool != null && responsePayload != null) {
+getLogger().error("ByteBuffer[{}] not released. Ref Count: {}. 
RequestType: {}", responsePayload.position(),
+refCount.get(), this.requestHeader.apiKey());
+memoryPool.release(responsePayload);
+responsePayload = null;
+}
+}
+
+@Override
+protected void finalize() throws Throwable {
+super.finalize();
+checkAndForceBufferRelease();
+}

Review comment:
   Finalizers are deprecated, so it would be good to avoid them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11229: KAFKA-12961; Verify group generation in `DelayedJoin`

2022-02-04 Thread GitBox


ijuma commented on pull request #11229:
URL: https://github.com/apache/kafka/pull/11229#issuecomment-1030533521


   @dajac is this still relevant?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11238: MINOR: Fix force kill of KRaft colocated controllers in system tests

2022-02-04 Thread GitBox


ijuma commented on pull request #11238:
URL: https://github.com/apache/kafka/pull/11238#issuecomment-1030533632


   Cc @jsancio @cmccabe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon closed pull request #11661: [WIP] MINOR: shutdown thread test

2022-02-04 Thread GitBox


showuon closed pull request #11661:
URL: https://github.com/apache/kafka/pull/11661


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11646: [WIP] KAFKA-13566: producer exponential backoff implementation for KIP-580

2022-02-04 Thread GitBox


showuon commented on pull request #11646:
URL: https://github.com/apache/kafka/pull/11646#issuecomment-1030567555


   @hachikuji , thanks for the comment. Yes, actually I've separated producer 
and consumer into 2 PRs. (actually, there's 3rd one for adminClient). It's just 
there are still some changes shared between producer and consumer. Please wait 
for https://github.com/apache/kafka/pull/11627 get merged before reviewing 
this. I've marked it as `WIP`. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11712: WIP: Put failed tasks to end of processing list

2022-02-04 Thread GitBox


ableegoldman commented on a change in pull request #11712:
URL: https://github.com/apache/kafka/pull/11712#discussion_r800029186



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
 return readOnlyActiveTasks;
 }
 
+List orderedActiveTasks() {
+return Collections.unmodifiableList(orderedActiveTasks);
+}
+
+void moveActiveTasksToTailFor(final String topologyName) {
+final List tasksToMove = new LinkedList<>();
+final Iterator iterator = orderedActiveTasks.iterator();
+while (iterator.hasNext()) {
+final Task task = iterator.next();
+if (task.id().topologyName().equals(topologyName)) {
+iterator.remove();
+tasksToMove.add(task);
+}
+}
+orderedActiveTasks.addAll(tasksToMove);

Review comment:
   > So we need to take the tasks that we know will fail and process all 
the other tasks without them
   
   There's definitely an implicit assumption here about the exception being (a) 
deterministic, and (b) correlated directly to some aspect of that specific task 
(eg de/serialization exception, NPE from input with null field, authorization 
failed on its topics) --and not a system error that happened to hit during that 
task's processing (eg RocksDBException: too many open files, out of memory, etc)
   
   Not saying we need to account for this in the first pass, I do think it's 
reasonable to assume that reprocessing the failed task will result in the same 
error since that's definitely true for what I suspect are the most common or 
large majority of errors: like the de/serialization or invalid timestamp 
errors,  NPEs, etc. But it's worth keeping in mind especially when we roll this 
out and can get actual data on how reasonable these assumptions are
   
   On that note -- I should make sure to add some kind of logging that will 
allow us to count how often a failed task repeated the same error, or any kind 
of error. (Could even be a metric eventually?) In the mid-far future we should 
have some kind of finer-grained error classification implemented that we could 
lean on as a heuristic for whether to retry the task again immediately, backoff 
for a while, or even restart the runtime for fatal system errors (eg OOM)
   
   Anyways I'll file some tickets for all this in the V2+ milestone, just 
wanted to get us thinking about this sort of thing so we have some vision of 
the future optimized error handling mechanism to inform how we lay the 
groundwork now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-02-04 Thread GitBox


showuon commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r800029493



##
File path: docs/upgrade.html
##
@@ -19,6 +19,13 @@
 
 

[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-02-04 Thread GitBox


showuon commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r800030072



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -514,12 +512,11 @@ public ProducerConfig(Map props) {
 }
 
 boolean idempotenceEnabled() {
-boolean userConfiguredIdempotence = 
this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
 boolean userConfiguredTransactions = 
this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
-boolean idempotenceEnabled = userConfiguredIdempotence && 
this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
-
-if (!idempotenceEnabled && userConfiguredIdempotence && 
userConfiguredTransactions)
+boolean idempotenceEnabled = 
this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+if (!idempotenceEnabled && userConfiguredTransactions)
 throw new ConfigException("Cannot set a " + 
ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
+
 return userConfiguredTransactions || idempotenceEnabled;

Review comment:
   You are right! After we default the `enable.idempotence` to true, 
there's no way to have `idempotenceEnabled==false` but 
`userConfiguredTransactions==true`. Remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed

2022-02-04 Thread GitBox


dajac commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r800030837



##
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##
@@ -1144,7 +1149,11 @@ public void close() {
 public SelectorMetrics(Metrics metrics, String metricGrpPrefix, 
Map metricTags, boolean metricsPerConnection) {
 this.metrics = metrics;
 this.metricTags = metricTags;
-this.metricsPerConnection = metricsPerConnection;
+if (metricsPerConnection) {
+this.connectionMetrics = new ConcurrentHashMap<>();

Review comment:
   Do we really need a `ConcurrentHashMap` here? My understanding is that 
the selector should be used by a single thread. Isn't it the case here?

##
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##
@@ -1305,44 +1314,62 @@ private Sensor sensor(String name, Sensor... parents) {
 }
 
 public void maybeRegisterConnectionMetrics(String connectionId) {
-if (!connectionId.isEmpty() && metricsPerConnection) {
-// if one sensor of the metrics has been registered for the 
connection,
-// then all other sensors should have been registered; and 
vice versa
-String nodeRequestName = "node-" + connectionId + 
".requests-sent";
-Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-if (nodeRequest == null) {
-Map tags = new LinkedHashMap<>(metricTags);
-tags.put("node-id", "node-" + connectionId);
-
-nodeRequest = sensor(nodeRequestName);
-nodeRequest.add(createMeter(metrics, 
perConnectionMetricGrpName, tags, new WindowedCount(), "request", "requests 
sent"));
-MetricName metricName = 
metrics.metricName("request-size-avg", perConnectionMetricGrpName, "The average 
size of requests sent.", tags);
-nodeRequest.add(metricName, new Avg());
-metricName = metrics.metricName("request-size-max", 
perConnectionMetricGrpName, "The maximum size of any request sent.", tags);
-nodeRequest.add(metricName, new Max());
-
-String bytesSentName = "node-" + connectionId + 
".bytes-sent";
-Sensor bytesSent = sensor(bytesSentName);
-bytesSent.add(createMeter(metrics, 
perConnectionMetricGrpName, tags, "outgoing-byte", "outgoing bytes"));
-
-String nodeResponseName = "node-" + connectionId + 
".responses-received";
-Sensor nodeResponse = sensor(nodeResponseName);
-nodeResponse.add(createMeter(metrics, 
perConnectionMetricGrpName, tags, new WindowedCount(), "response", "responses 
received"));
-
-String bytesReceivedName = "node-" + connectionId + 
".bytes-received";
-Sensor bytesReceive = sensor(bytesReceivedName);
-bytesReceive.add(createMeter(metrics, 
perConnectionMetricGrpName, tags, "incoming-byte", "incoming bytes"));
-
-String nodeTimeName = "node-" + connectionId + ".latency";
-Sensor nodeRequestTime = sensor(nodeTimeName);
-metricName = metrics.metricName("request-latency-avg", 
perConnectionMetricGrpName, tags);
-nodeRequestTime.add(metricName, new Avg());
-metricName = metrics.metricName("request-latency-max", 
perConnectionMetricGrpName, tags);
-nodeRequestTime.add(metricName, new Max());
-}
+if (!connectionId.isEmpty() && connectionMetrics != null) {
+connectionMetrics.computeIfAbsent(connectionId, (key) -> {
+// key: connection id
+// value: set of sensors (currently null)
+return perConnectionSensors(key);
+});
 }
 }
 
+public void maybeUnregisterConnectionMetrics(String connectionId) {
+if (!connectionId.isEmpty() && connectionMetrics != null) {
+connectionMetrics.computeIfPresent(connectionId, (key, value) 
-> {
+// key: connection id
+// value: set of sensors
+for (Sensor sensor : value) {
+metrics.removeSensor(sensor.name());
+}
+return null;
+});
+}
+}
+
+private Set perConnectionSensors(String connectionId) {

Review comment:
   Would it make sense to create a `ConnectionMetrics` class to hold all 
the connection metrics? That would give us an opportunity to improve all the 
`record*` methods as well. They could get the sensors based on the 
`connectionId`.




-- 
This is an automated message from the Apa

[GitHub] [kafka] runom opened a new pull request #11735: MINOR: Check the help and version options firstly

2022-02-04 Thread GitBox


runom opened a new pull request #11735:
URL: https://github.com/apache/kafka/pull/11735


   Currently, `bin/kafka-consumer-groups.sh --version` reqiures unnecessary 
`bootstrap-server` option.
   ```
   % bin/kafka-consumer-groups.sh --version
   Missing required argument "[bootstrap-server]"
   Option  Description
   --  ---
   --all-groupsApply to all consumer groups.
   --all-topicsConsider all topics assigned to a
 group in the `reset-offsets` 
process.
   ...
   --version   Display Kafka version.
   ```
   
   ```
   % bin/kafka-consumer-groups.sh --version --bootstrap-server=localhost:9092
   3.2.0-SNAPSHOT (Commit:21c3009ac12f79d0)
   ```
   
   This PR fixes this problem.
   
   ```
   % bin/kafka-consumer-groups.sh --version
   3.2.0-SNAPSHOT (Commit:efe4bfd2c49997f7)
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org