[GitHub] [kafka] ableegoldman commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
ableegoldman commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r799253840 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java ## @@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); } else { -maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); +maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) +? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1; } -if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { -cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); -log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); +if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) || +isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { + +if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { +cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); +log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}", +topologyName, +STATESTORE_CACHE_MAX_BYTES_CONFIG, +CACHE_MAX_BYTES_BUFFERING_CONFIG, +STATESTORE_CACHE_MAX_BYTES_CONFIG, +cacheSize); +} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { +cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); +log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); +} else { +cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); +log.info("Topology {} is overriding {} to {}", topologyName, STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize); +} } else { -cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); +cacheSize = globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); Review comment: Sounds good! There's no rush, but I'll make sure we have your new PRs reviewed and merged quickly whenever they are ready, since you've worked so hard on this already. I'm sorry I wasn't able to make another pass on your original PR, but hopefully this won't be too much of a bother. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
ableegoldman commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r799261397 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1034,14 +1037,15 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f final StreamThread streamThread; synchronized (changeThreadCount) { final int threadIdx = getNextThreadIndex(); -final int numLiveThreads = getNumLiveStreamThreads(); -final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1); -log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}", - threadIdx, numLiveThreads + 1, cacheSizePerThread); -resizeThreadCache(cacheSizePerThread); // Creating thread should hold the lock in order to avoid duplicate thread index. // If the duplicate index happen, the metadata of thread may be duplicate too. -streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx); +// Also, we create the new thread with initial values of cache size and max buffer size as 0 +// and then resize them later +streamThread = createAndAddStreamThread(0L, 0L, threadIdx); +final int numLiveThreads = getNumLiveStreamThreads(); +resizeThreadCacheAndBufferMemory(numLiveThreads + 1); Review comment: One more thing -- @wcarlson5 mentioned there was an off-by-one error here due to the re-ordering of these calls. Specifically, the `+ 1` in this line was necessary before now because we called `resizeThreadCache` before actually adding the new thread, so we had to account for the new thread by adding one. But since we now create/add the new thread first, the `getNumLiveStreamThreads` method will actually return the correct number of threads, so we don't need the `+ 1` anymore. On that note, I take it we reordered these calls because we now create the thread without the cache value and then call `resize` to set the cache after the thread has already been created. I was wondering: why do we need to do this post-construction resizing? I only looked at this part of the PR briefly, but it seems to me like we always have the actual cache size known when we're creating the thread, so can't we just pass that in to the StreamThread#create method/constructor? It's just a bit confusing to initialize the cache size to some random value, it took me a little while to figure out what was going on with that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] predatorray commented on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id
predatorray commented on pull request #10525: URL: https://github.com/apache/kafka/pull/10525#issuecomment-1029767796 @guozhangwang Thanks for the reply and the review. The change was rebased. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
satishd commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r796465473 ## File path: clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC; +import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; +import static org.apache.kafka.common.record.Records.MAGIC_OFFSET; +import static org.apache.kafka.common.record.Records.OFFSET_OFFSET; +import static org.apache.kafka.common.record.Records.SIZE_OFFSET; + +public class RemoteLogInputStream implements LogInputStream { +private final InputStream is; +private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC); + +public RemoteLogInputStream(InputStream is) { +this.is = is; +} + +@Override +public RecordBatch nextBatch() throws IOException { +logHeaderBuffer.rewind(); +Utils.readFully(is, logHeaderBuffer); + +if (logHeaderBuffer.position() < HEADER_SIZE_UP_TO_MAGIC) +return null; + +logHeaderBuffer.rewind(); +logHeaderBuffer.getLong(OFFSET_OFFSET); Review comment: Good point. It is not really needed. Looks like it was added earlier to log offset but the log was removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486576#comment-17486576 ] Federico Valeri edited comment on KAFKA-12635 at 2/4/22, 8:51 AM: -- I was able to reproduce the issue on Kafka 2.7.2 and 2.8.1, but not on 3.1.0. State of the source cluster after producing/consuming 1mln records: {code} $ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9090 --describe --topic my-topic Topic: my-topic PartitionCount: 3 ReplicationFactor: 3Configs: min.insync.replicas=2,message.format.version=2.8-IV1 Topic: my-topic Partition: 0Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: my-topic Partition: 1Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: my-topic Partition: 2Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 $ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9090 --describe --group my-group Consumer group 'my-group' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOSTCLIENT-ID my-groupmy-topic0 332600 332600 0 - - - my-groupmy-topic1 335510 335510 0 - - - my-groupmy-topic2 331890 331890 0 - - - {code} State of the target cluster after MM2 has done its job (sync.group.offsets.enabled = true): {code} $ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9093 --describe --topic my-topic Topic: my-topic PartitionCount: 3 ReplicationFactor: 3Configs: min.insync.replicas=2,message.format.version=2.8-IV1 Topic: my-topic Partition: 0Leader: 3 Replicas: 3,4,5 Isr: 3,4,5 Topic: my-topic Partition: 1Leader: 4 Replicas: 4,5,3 Isr: 4,5,3 Topic: my-topic Partition: 2Leader: 5 Replicas: 5,3,4 Isr: 5,3,4 $ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9093 --describe --group my-group Consumer group 'my-group' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOSTCLIENT-ID my-groupmy-topic0 332600 0 -332600 - - - my-groupmy-topic1 335510 0 -335510 - - - my-groupmy-topic2 331890 0 -331890 - - - {code} There is actually no need to set a custom value for retention.ms in order to trigger the issue. was (Author: fvaleri): I was able to reproduce the issue on Kafka 2.7.2 and 2.8.1, but not on 3.1.0. State of the source cluster after producing/consuming 1mln records: {code} $ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9090 --describe --topic my-topic Topic: my-topic PartitionCount: 3 ReplicationFactor: 3Configs: min.insync.replicas=2,message.format.version=2.8-IV1 Topic: my-topic Partition: 0Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: my-topic Partition: 1Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: my-topic Partition: 2Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 $ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9090 --describe --group my-group Consumer group 'my-group' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOSTCLIENT-ID my-groupmy-topic0 332600 332600 0 - - - my-groupmy-topic1 335510 335510 0 - - - my-groupmy-topic2 331890 331890 0 - - - {code} State of the target cluster after MM2 has done its job (sync.group.offsets.enabled = true, replication.policy.class = io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy): {code} $ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9093 --describe --topic my-topic Topic: my-topic PartitionCount: 3 ReplicationFactor: 3Configs: min.insync.replicas=2,message.format.version=2.8-IV1 Topic: my-topic Partition: 0Leader: 3 Replicas: 3,4,5 Isr: 3,4,5 Topic: my-topic Partition: 1Leader: 4 Replicas: 4,5,3 Isr: 4,5,3 Topic: my-topic Partition: 2Leader: 5 Replicas: 5,3,4 Isr: 5,3,4 $ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9093 --describe --group my-
[jira] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635 ] Federico Valeri deleted comment on KAFKA-12635: - was (Author: fvaleri): I was able to reproduce the issue on Kafka 2.7.2 and 2.8.1, but not on 3.1.0. State of the source cluster after producing/consuming 1mln records: {code} $ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9090 --describe --topic my-topic Topic: my-topic PartitionCount: 3 ReplicationFactor: 3Configs: min.insync.replicas=2,message.format.version=2.8-IV1 Topic: my-topic Partition: 0Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: my-topic Partition: 1Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: my-topic Partition: 2Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 $ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9090 --describe --group my-group Consumer group 'my-group' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOSTCLIENT-ID my-groupmy-topic0 332600 332600 0 - - - my-groupmy-topic1 335510 335510 0 - - - my-groupmy-topic2 331890 331890 0 - - - {code} State of the target cluster after MM2 has done its job (sync.group.offsets.enabled = true): {code} $ $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server :9093 --describe --topic my-topic Topic: my-topic PartitionCount: 3 ReplicationFactor: 3Configs: min.insync.replicas=2,message.format.version=2.8-IV1 Topic: my-topic Partition: 0Leader: 3 Replicas: 3,4,5 Isr: 3,4,5 Topic: my-topic Partition: 1Leader: 4 Replicas: 4,5,3 Isr: 4,5,3 Topic: my-topic Partition: 2Leader: 5 Replicas: 5,3,4 Isr: 5,3,4 $ $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server :9093 --describe --group my-group Consumer group 'my-group' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOSTCLIENT-ID my-groupmy-topic0 332600 0 -332600 - - - my-groupmy-topic1 335510 0 -335510 - - - my-groupmy-topic2 331890 0 -331890 - - - {code} There is actually no need to set a custom value for retention.ms in order to trigger the issue. > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13626) NullPointerException in Selector.pollSelectionKeys: channel is null
[ https://issues.apache.org/jira/browse/KAFKA-13626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486916#comment-17486916 ] Daniel Häuser commented on KAFKA-13626: --- [~Kvicii] I don't know exactly what happened there. As far as I know, the machine that ran the Kafka brokers were not able to communicate with each other at that point of time. Unfortunately I do not have any further access to them, nor the configuration of the client. But no matter what caused this issue, I don't think that the Kafka Client should throw a NullPointerException in any case. The Exception should be something that the caller can handle accordingly (e.g. IOException on networking issues so that you can retry). That's why I opened this bug report. However, I can also understand if this ticket will closed, because it contains too little information to be able to reproduce the problem. > NullPointerException in Selector.pollSelectionKeys: channel is null > --- > > Key: KAFKA-13626 > URL: https://issues.apache.org/jira/browse/KAFKA-13626 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.7.1 >Reporter: Daniel Häuser >Priority: Minor > > This NullPointerException occured while we were having networking issues. > Unfortunately I cannot provide much more information than this stack trace > because this is all I got from our operations team. > {code:java} > java.lang.IllegalStateException: This error handler cannot process > 'java.lang.NullPointerException's; no record information is available > at > org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) > at > org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1599) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:831) > Caused by: java.lang.NullPointerException: Cannot invoke > "org.apache.kafka.common.network.KafkaChannel.id()" because "channel" is null > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:516) > at org.apache.kafka.common.network.Selector.poll(Selector.java:481) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at jdk.internal.reflect.GeneratedMethodAccessor128.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > at > org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) > at > org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) > at jdk.proxy2/jdk.proxy2.$Proxy137.poll(Unknown Source) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) > ... 3 common
[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
ijuma commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799428683 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -578,41 +587,45 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; +} -boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); -ProducerIdAndEpoch producerIdAndEpoch = -transactionManager != null ? transactionManager.producerIdAndEpoch() : null; -ProducerBatch batch = deque.pollFirst(); -if (producerIdAndEpoch != null && !batch.hasSequence()) { -// If the producer id/epoch of the partition do not match the latest one -// of the producer, we update it and reset the sequence. This should be -// only done when all its in-flight batches have completed. This is guarantee -// in `shouldStopDrainBatchesForPartition`. - transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); - -// If the batch already has an assigned sequence, then we should not change the producer id and -// sequence number, since this may introduce duplicates. In particular, the previous attempt -// may actually have been accepted, and if we change the producer id and sequence here, this -// attempt will also be accepted, causing a duplicate. -// -// Additionally, we update the next sequence number bound for the partition, and also have -// the transaction manager track the batch so as to ensure that sequence ordering is maintained -// even if we receive out of order responses. -batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); - transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); -log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + -"{} being sent to partition {}", producerIdAndEpoch.producerId, -producerIdAndEpoch.epoch, batch.baseSequence(), tp); - -transactionManager.addInFlightBatch(batch); -} -batch.close(); -size += batch.records().sizeInBytes(); -ready.add(batch); +// do the rest of the work by processing outside the lock +// close() is particularly expensive +batch = deque.pollFirst(); +} -batch.drained(now); -} +boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); +ProducerIdAndEpoch producerIdAndEpoch = +transactionManager != null ? transactionManager.producerIdAndEpoch() : null; +if (producerIdAndEpoch != null && !batch.hasSequence()) { +// If the producer id/epoch of the partition do not match the latest one +// of the producer, we update it and reset the sequence. This should be +// only done when all its in-flight batches have completed. This is guarantee +// in `shouldStopDrainBatchesForPartition`. + transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); + +// If the batch already has an assigned sequence, then we should not change the producer id and +// sequence number, since this may introduce duplicates. In particular, the previous attempt +// may actually have been accepted, and if we change the producer id and sequence here, this +// attempt will also be accepted, causing a duplicate. +// +// Additionally, we update the next sequence number bound for the partition, and also have +// the transaction manager track the batch so as to ensure that sequence ordering is maintained +// even if we receive out of order responses. +batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); + transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); Review comment: No, not a blocker. -- This is an automated message from
[GitHub] [kafka] ijuma commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
ijuma commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799456068 ## File path: clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java ## @@ -239,6 +239,66 @@ public void testDouble() throws IOException { assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L); } +private static int mathSizeOfUnsignedVarint(int value) { +int leadingZeros = Integer.numberOfLeadingZeros(value); +// return (38 - leadingZeros) / 7 + leadingZeros / 32; +int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 0b10010010010010011) >>> 19; +return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5); +} + +@Test +public void testSizeOfUnsignedVarintMath() { +for (int i = 0; i < Integer.MAX_VALUE; i++) { Review comment: Is this too slow for a unit test? Similar question for other tests that do a large range of values. ## File path: clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java ## @@ -239,6 +239,66 @@ public void testDouble() throws IOException { assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L); } +private static int mathSizeOfUnsignedVarint(int value) { +int leadingZeros = Integer.numberOfLeadingZeros(value); +// return (38 - leadingZeros) / 7 + leadingZeros / 32; +int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 0b10010010010010011) >>> 19; +return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5); +} + +@Test +public void testSizeOfUnsignedVarintMath() { +for (int i = 0; i < Integer.MAX_VALUE; i++) { +final int actual = mathSizeOfUnsignedVarint(i); +final int expected = oldSizeOfUnsignedVarint(i); +assertEquals(expected, actual); +} +} + +/** + * The old well-known implementation for sizeOfUnsignedVarint + */ +private static int oldSizeOfUnsignedVarint(int value) { Review comment: Maybe we can call this `simpleSizeOfUnsignedVarint` or something like that? ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.util; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.common.utils.ByteUtils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ByteUtilsBenchmark { +private int input; + +@Setup(Level.Iteration) +public void setUp() { +input = ThreadLocalRandom.current().nextInt(2 * 1024 * 1024); +} + +@Benchmark +@Fork(3) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 10, time = 1) +public int testSizeOfUnsignedVarint() { Review comment: Shall we also add methods for `sizeOfVarlong`? ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * +
[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
ijuma commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799462063 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long nowMs) { boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); + +final ProducerBatch batch; +final long waitedTimeMs; +final boolean backingOff; +final boolean full; + +// Collect as little as possible inside critical region, determine outcome after release synchronized (deque) { -// When producing to a large number of partitions, this path is hot and deques are often empty. -// We check whether a batch exists first to avoid the more expensive checks whenever possible. Review comment: Why did we remove the information on why this is performance sensitive? ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long nowMs) { boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); + +final ProducerBatch batch; +final long waitedTimeMs; +final boolean backingOff; +final boolean full; + +// Collect as little as possible inside critical region, determine outcome after release synchronized (deque) { -// When producing to a large number of partitions, this path is hot and deques are often empty. -// We check whether a batch exists first to avoid the more expensive checks whenever possible. -ProducerBatch batch = deque.peekFirst(); -if (batch != null) { -TopicPartition part = entry.getKey(); -Node leader = cluster.leaderFor(part); -if (leader == null) { -// This is a partition for which leader is not known, but messages are available to send. -// Note that entries are currently not removed from batches when deque is empty. -unknownLeaderTopics.add(part.topic()); -} else if (!readyNodes.contains(leader) && !isMuted(part)) { -long waitedTimeMs = batch.waitedTimeMs(nowMs); -boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; -long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; -boolean full = deque.size() > 1 || batch.isFull(); -boolean expired = waitedTimeMs >= timeToWaitMs; -boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting(); -boolean sendable = full -|| expired -|| exhausted -|| closed -|| flushInProgress() -|| transactionCompleting; -if (sendable && !backingOff) { -readyNodes.add(leader); -} else { -long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); -// Note that this results in a conservative estimate since an un-sendable partition may have -// a leader that will later be found to have sendable data. However, this is good enough -// since we'll just wake up and then sleep again for the remaining time. -nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); -} -} +batch = deque.peekFirst(); Review comment: Out of curiosity, do these make much of a difference? We can probably keep them, but it seems like the big problem was the `close` call being inside the lock (in the other method). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487148#comment-17487148 ] Bruno Cadonna commented on KAFKA-13600: --- I am fine with discussing the improvement on the PR and not in a KIP. I actually realized that the other improvements to the assignment algorithm included changes to the public API and therefore a KIP was needed. For me it is just important that we look really careful at the improvements because the assignment algorithm is a quite critical part of the system. Additionally, I did not want to discuss about a totally new assignment algorithm. I just linked the information for general interest. Looking forward to the PR. > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch > > > Key: KAFKA-13600 > URL: https://issues.apache.org/jira/browse/KAFKA-13600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 2.8.1, 3.0.0 >Reporter: Tim Patterson >Priority: Major > > Consider this scenario: > # A node is lost from the cluster. > # A rebalance is kicked off with a new "target assignment"'s(ie the > rebalance is attempting to move a lot of tasks - see > https://issues.apache.org/jira/browse/KAFKA-10121). > # The kafka cluster is now a bit more sluggish from the increased load. > # A Rolling Deploy happens triggering rebalances, during the rebalance > processing continues but offsets can't be committed(Or nodes are restarted > but fail to commit offsets) > # The most caught up nodes now aren't within `acceptableRecoveryLag` and so > the task is started in it's "target assignment" location, restoring all state > from scratch and delaying further processing instead of using the "almost > caught up" node. > We've hit this a few times and having lots of state (~25TB worth) and being > heavy users of IQ this is not ideal for us. > While we can increase `acceptableRecoveryLag` to larger values to try get > around this that causes other issues (ie a warmup becoming active when its > still quite far behind) > The solution seems to be to balance "balanced assignment" with "most caught > up nodes". > We've got a fork where we do just this and it's made a huge difference to the > reliability of our cluster. > Our change is to simply use the most caught up node if the "target node" is > more than `acceptableRecoveryLag` behind. > This gives up some of the load balancing type behaviour of the existing code > but in practise doesn't seem to matter too much. > I guess maybe an algorithm that identified candidate nodes as those being > within `acceptableRecoveryLag` of the most caught up node might allow the > best of both worlds. > > Our fork is > [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1] > (We also moved the capacity constraint code to happen after all the stateful > assignment to prioritise standby tasks over warmup tasks) > Ideally we don't want to maintain a fork of kafka streams going forward so > are hoping to get a bit of discussion / agreement on the best way to handle > this. > More than happy to contribute code/test different algo's in production system > or anything else to help with this issue -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487184#comment-17487184 ] Bruno Cadonna commented on KAFKA-13638: --- [~Lejon] How did you measure time with your test? I cannot really reproduce your numbers. > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487211#comment-17487211 ] Ulrik commented on KAFKA-13638: --- [~cadonna] i ran the test from within Intellij and just took the numbers from Intellij, which display the running time for the test > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] hachikuji commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs
hachikuji commented on a change in pull request #11691: URL: https://github.com/apache/kafka/pull/11691#discussion_r799691615 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -514,12 +512,11 @@ public ProducerConfig(Map props) { } boolean idempotenceEnabled() { -boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG); boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG); -boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); - -if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions) +boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); +if (!idempotenceEnabled && userConfiguredTransactions) throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence."); + return userConfiguredTransactions || idempotenceEnabled; Review comment: nit: I think the check for `userConfiguredTransactions` is redundant now. ## File path: docs/upgrade.html ## @@ -19,6 +19,13 @@
[GitHub] [kafka] ijuma commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs
ijuma commented on a change in pull request #11691: URL: https://github.com/apache/kafka/pull/11691#discussion_r799697679 ## File path: docs/upgrade.html ## @@ -19,6 +19,13 @@
[GitHub] [kafka] cmccabe commented on pull request #11659: KAFKA-13503: Validate broker configs for KRaft
cmccabe commented on pull request #11659: URL: https://github.com/apache/kafka/pull/11659#issuecomment-1030241564 Thanks for the PR, @dengziming. The broker validation needs to be done on the broker side, since the broker has access to information that the controller does not. For example, the broker knows whether a given file exists, and so can validate whether it is reasonable to set the SSL certificate to that file. KAFKA-13552 added this validation on the broker side. So I think we should close this PR as a duplicate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13503) Validate broker configs for KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-13503. -- Fix Version/s: 3.2.0 Reviewer: Jose Armando Garcia Sancio Assignee: Colin McCabe (was: dengziming) Resolution: Fixed > Validate broker configs for KRaft > - > > Key: KAFKA-13503 > URL: https://issues.apache.org/jira/browse/KAFKA-13503 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > Fix For: 3.2.0 > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13552) Unable to dynamically change broker log levels on KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-13552: - Fix Version/s: 3.2.0 > Unable to dynamically change broker log levels on KRaft > --- > > Key: KAFKA-13552 > URL: https://issues.apache.org/jira/browse/KAFKA-13552 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ron Dagostino >Assignee: Colin McCabe >Priority: Major > Fix For: 3.2.0 > > > It is currently not possible to dynamically change the log level in KRaft. > For example: > kafka-configs.sh --bootstrap-server --alter --add-config > "kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers > --entity-name 0 > Results in: > org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource > type BROKER_LOGGER. > The code to process this request is in ZkAdminManager.alterLogLevelConfigs(). > This needs to be moved out of there, and the functionality has to be > processed locally on the broker instead of being forwarded to the KRaft > controller. > It is also an open question as to how we can dynamically alter log levels for > a remote KRaft controller. Connecting directly to it is one possible > solution, but that may not be desirable since generally connecting directly > to the controller is not necessary. The ticket for this particular spect of > the issue is https://issues.apache.org/jira/browse/KAFKA-13502 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13503) Validate broker configs for KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487221#comment-17487221 ] Colin McCabe commented on KAFKA-13503: -- The PR for KAFKA-13552 added broker-side validation as well. So I think we can close this now. {code} commit 68a19539cf1fcd86787960d0010b672d0d611b91 Author: Colin Patrick McCabe Date: Fri Jan 21 16:00:21 2022 -0800 KAFKA-13552: Fix BROKER and BROKER_LOGGER in KRaft (#11657) {code} > Validate broker configs for KRaft > - > > Key: KAFKA-13503 > URL: https://issues.apache.org/jira/browse/KAFKA-13503 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: dengziming >Priority: Major > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
guozhangwang commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r799712506 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1034,14 +1037,15 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f final StreamThread streamThread; synchronized (changeThreadCount) { final int threadIdx = getNextThreadIndex(); -final int numLiveThreads = getNumLiveStreamThreads(); -final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1); -log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}", - threadIdx, numLiveThreads + 1, cacheSizePerThread); -resizeThreadCache(cacheSizePerThread); // Creating thread should hold the lock in order to avoid duplicate thread index. // If the duplicate index happen, the metadata of thread may be duplicate too. -streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx); +// Also, we create the new thread with initial values of cache size and max buffer size as 0 +// and then resize them later +streamThread = createAndAddStreamThread(0L, 0L, threadIdx); +final int numLiveThreads = getNumLiveStreamThreads(); +resizeThreadCacheAndBufferMemory(numLiveThreads + 1); Review comment: > I was wondering: why do we need to do this post-construction resizing? The main motivation is to consolidate the resizing of the thread cache and buffer within a single call. More details can be found in this comment thread: https://github.com/apache/kafka/pull/11424#discussion_r774058848 I suggested we initialize the new thread with 0 value -- should not be a random value? -- and then resize (at that time we have the correct number of threads to divide). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9154) ProducerId generation should be managed by the Controller
[ https://issues.apache.org/jira/browse/KAFKA-9154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-9154. - Fix Version/s: 3.1.0 Resolution: Fixed We implemented KIP-730 in Kafka 3.1. Closing this. > ProducerId generation should be managed by the Controller > - > > Key: KAFKA-9154 > URL: https://issues.apache.org/jira/browse/KAFKA-9154 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Viktor Somogyi-Vass >Assignee: David Arthur >Priority: Major > Labels: kip-500 > Fix For: 3.1.0 > > > Currently producerIds are maintained in Zookeeper but in the future we'd like > them to be managed by the controller quorum in an internal topic. > The reason for storing this in Zookeeper was that this must be unique across > the cluster. In this task it should be refactored such that the > TransactionManager turns to the Controller for a ProducerId which connects to > Zookeeper to acquire this ID. Since ZK is the single source of truth and the > PID won't be cached anywhere it should be safe (just one extra hop added). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-9837) New RPC for notifying controller of failed replica
[ https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487226#comment-17487226 ] Colin McCabe commented on KAFKA-9837: - I'm sorry that there hasn't been any activity on this one in a while (There was more on the PR). The issue here is that we don't want to add an RPC which will be O(num_partitions_in_dir). It will just become unworkable as directory sizes increase, the same as the old controller RPCs which were like this. So we need to start keeping metadata on the controller about which partitions are in which directory. However, that would require a KIP which we haven't had time to do yet... > New RPC for notifying controller of failed replica > -- > > Key: KAFKA-9837 > URL: https://issues.apache.org/jira/browse/KAFKA-9837 > Project: Kafka > Issue Type: New Feature > Components: controller, core >Reporter: David Arthur >Assignee: dengziming >Priority: Major > Labels: kip-500 > Fix For: 3.2.0 > > > This is the tracking ticket for > [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller]. > For the bridge release, brokers should no longer use ZooKeeper to notify the > controller that a log dir has failed. It should instead use an RPC mechanism. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-10724) Command to run single quorum in raft is missing "--config" parameters.
[ https://issues.apache.org/jira/browse/KAFKA-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487230#comment-17487230 ] Colin McCabe commented on KAFKA-10724: -- The command has been renamed to {{raft/bin/test-kraft-server-start.sh}}, and it does have a {{--config}} flag now: {code} raft/bin/test-kraft-server-start.sh Standalone raft server for performance testing Option Description -- --- --config Required configured file --help Print usage information. --record-size The size of each record (default: 256) --throughput The number of records per second the leader will write to the metadata topic (default: 5000) --version Display Kafka version. {code} (To be clear, this is just an internal testing tool, not something that end-users should use) Closing. > Command to run single quorum in raft is missing "--config" parameters. > -- > > Key: KAFKA-10724 > URL: https://issues.apache.org/jira/browse/KAFKA-10724 > Project: Kafka > Issue Type: Bug > Components: core, docs >Reporter: huldar chen >Priority: Major > Labels: kip-500 > > When I run "bin/test-raft-server-start.sh config/raft.properties", I get an > error: > [2020-11-14 23:00:38,742] ERROR Exiting Kafka due to fatal exception > (kafka.tools.TestRaftServer$) > org.apache.kafka.common.config.ConfigException: Missing required > configuration "zookeeper.connect" which has no default value. > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:142) > at kafka.server.KafkaConfig.(KafkaConfig.scala:1314) > at kafka.server.KafkaConfig.(KafkaConfig.scala:1317) > at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:607) > at kafka.tools.TestRaftServer.main(TestRaftServer.scala) > The correct command is “ ./bin/test-raft-server-start.sh --config > ./config/raft.properties” -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-10724) Command to run single quorum in raft is missing "--config" parameters.
[ https://issues.apache.org/jira/browse/KAFKA-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-10724: Assignee: Jason Gustafson > Command to run single quorum in raft is missing "--config" parameters. > -- > > Key: KAFKA-10724 > URL: https://issues.apache.org/jira/browse/KAFKA-10724 > Project: Kafka > Issue Type: Bug > Components: core, docs >Reporter: huldar chen >Assignee: Jason Gustafson >Priority: Major > Labels: kip-500 > > When I run "bin/test-raft-server-start.sh config/raft.properties", I get an > error: > [2020-11-14 23:00:38,742] ERROR Exiting Kafka due to fatal exception > (kafka.tools.TestRaftServer$) > org.apache.kafka.common.config.ConfigException: Missing required > configuration "zookeeper.connect" which has no default value. > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:142) > at kafka.server.KafkaConfig.(KafkaConfig.scala:1314) > at kafka.server.KafkaConfig.(KafkaConfig.scala:1317) > at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:607) > at kafka.tools.TestRaftServer.main(TestRaftServer.scala) > The correct command is “ ./bin/test-raft-server-start.sh --config > ./config/raft.properties” -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-10724) Command to run single quorum in raft is missing "--config" parameters.
[ https://issues.apache.org/jira/browse/KAFKA-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-10724. -- Resolution: Fixed > Command to run single quorum in raft is missing "--config" parameters. > -- > > Key: KAFKA-10724 > URL: https://issues.apache.org/jira/browse/KAFKA-10724 > Project: Kafka > Issue Type: Bug > Components: core, docs >Reporter: huldar chen >Priority: Major > Labels: kip-500 > > When I run "bin/test-raft-server-start.sh config/raft.properties", I get an > error: > [2020-11-14 23:00:38,742] ERROR Exiting Kafka due to fatal exception > (kafka.tools.TestRaftServer$) > org.apache.kafka.common.config.ConfigException: Missing required > configuration "zookeeper.connect" which has no default value. > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:142) > at kafka.server.KafkaConfig.(KafkaConfig.scala:1314) > at kafka.server.KafkaConfig.(KafkaConfig.scala:1317) > at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:607) > at kafka.tools.TestRaftServer.main(TestRaftServer.scala) > The correct command is “ ./bin/test-raft-server-start.sh --config > ./config/raft.properties” -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-12209) Add the timeline data structures for the KIP-631 controller
[ https://issues.apache.org/jira/browse/KAFKA-12209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12209. -- Fix Version/s: 2.8.0 Assignee: Colin McCabe Resolution: Fixed > Add the timeline data structures for the KIP-631 controller > --- > > Key: KAFKA-12209 > URL: https://issues.apache.org/jira/browse/KAFKA-12209 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > Fix For: 2.8.0 > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-12214) Generated code does not include UUID or struct fields in its toString output
[ https://issues.apache.org/jira/browse/KAFKA-12214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12214. -- Resolution: Fixed > Generated code does not include UUID or struct fields in its toString output > > > Key: KAFKA-12214 > URL: https://issues.apache.org/jira/browse/KAFKA-12214 > Project: Kafka > Issue Type: Bug > Components: generator >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > > The generated code does not include UUID or struct fields in its toString > output. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-12271) Expose consistent Raft metadata to components
[ https://issues.apache.org/jira/browse/KAFKA-12271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12271. -- Fix Version/s: 3.0.0 Resolution: Fixed > Expose consistent Raft metadata to components > - > > Key: KAFKA-12271 > URL: https://issues.apache.org/jira/browse/KAFKA-12271 > Project: Kafka > Issue Type: Improvement >Reporter: David Arthur >Assignee: Colin McCabe >Priority: Minor > Labels: kip-500 > Fix For: 3.0.0 > > > In Raft clusters, we need a way of exposing consistent metadata to components > such as ReplicaManager, LogManager, etc. > This is done through a new class named MetadataImage. As Raft metadata > records are processed, new MetadataImage-s are built and provided to the > MetadataCache atomically. This avoids readers seeing any partially > materialized metadata. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12421) Improve controller's atomic grouping
[ https://issues.apache.org/jira/browse/KAFKA-12421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487233#comment-17487233 ] Colin McCabe commented on KAFKA-12421: -- We did introduce a distinction between atomic and non-atomic writes, so let's close this for now. > Improve controller's atomic grouping > > > Key: KAFKA-12421 > URL: https://issues.apache.org/jira/browse/KAFKA-12421 > Project: Kafka > Issue Type: Improvement > Components: controller >Reporter: José Armando García Sancio >Assignee: HaiyuanZhao >Priority: Major > Labels: kip-500 > > The current controller implementation atomically appends to the metadata log > by making sure that all required records are on the same batch. The > controller groups all of the records that result from an RPC into one batch. > Some of the RPCs are: > # Client quota changes > # Configuration changes > # Feature changes > # Topic creation > This is good enough for correctness but it is more aggressive than necessary. > For example, for topic creation since errors are reported independently, the > controller only needs to guarantee that all of the records for one topic are > committed atomically. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-12421) Improve controller's atomic grouping
[ https://issues.apache.org/jira/browse/KAFKA-12421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12421. -- Assignee: Jose Armando Garcia Sancio (was: HaiyuanZhao) Resolution: Fixed > Improve controller's atomic grouping > > > Key: KAFKA-12421 > URL: https://issues.apache.org/jira/browse/KAFKA-12421 > Project: Kafka > Issue Type: Improvement > Components: controller >Reporter: José Armando García Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: kip-500 > > The current controller implementation atomically appends to the metadata log > by making sure that all required records are on the same batch. The > controller groups all of the records that result from an RPC into one batch. > Some of the RPCs are: > # Client quota changes > # Configuration changes > # Feature changes > # Topic creation > This is good enough for correctness but it is more aggressive than necessary. > For example, for topic creation since errors are reported independently, the > controller only needs to guarantee that all of the records for one topic are > committed atomically. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-12502) Quorum controller should return topic configs in CreateTopic response
[ https://issues.apache.org/jira/browse/KAFKA-12502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-12502: Assignee: Colin McCabe (was: Ryan Dielhenn) > Quorum controller should return topic configs in CreateTopic response > - > > Key: KAFKA-12502 > URL: https://issues.apache.org/jira/browse/KAFKA-12502 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > > Configs were added to the response in version 5. > {code} > { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": > "5+", "nullableVersions": "5+", "ignorable": true, > "about": "Configuration of the topic.", "fields": [ > { "name": "Name", "type": "string", "versions": "5+", > "about": "The configuration name." }, > { "name": "Value", "type": "string", "versions": "5+", > "nullableVersions": "5+", > "about": "The configuration value." }, > { "name": "ReadOnly", "type": "bool", "versions": "5+", > "about": "True if the configuration is read-only." }, > { "name": "ConfigSource", "type": "int8", "versions": "5+", > "default": "-1", "ignorable": true, > "about": "The configuration source." }, > { "name": "IsSensitive", "type": "bool", "versions": "5+", > "about": "True if this configuration is sensitive." } > ]} > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13646) Implement KIP-801: KRaft authorizer
Colin McCabe created KAFKA-13646: Summary: Implement KIP-801: KRaft authorizer Key: KAFKA-13646 URL: https://issues.apache.org/jira/browse/KAFKA-13646 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Assignee: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13193) Replica manager doesn't update partition state when transitioning from leader to follower with unknown leader
[ https://issues.apache.org/jira/browse/KAFKA-13193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-13193. -- Resolution: Fixed > Replica manager doesn't update partition state when transitioning from leader > to follower with unknown leader > - > > Key: KAFKA-13193 > URL: https://issues.apache.org/jira/browse/KAFKA-13193 > Project: Kafka > Issue Type: Bug > Components: kraft, replication >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: kip-500 > > This issue applies to both the ZK and KRaft implementation of the replica > manager. In the rare case when a replica transition from leader to follower > with no leader the partition state is not updated. > This is because when handling makeFollowers the ReplicaManager only updates > the partition state if the leader is alive. The solution is to always > transition to follower but not start the fetcher thread if the leader is > unknown or not alive. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] jasonk000 commented on pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on pull request #11721: URL: https://github.com/apache/kafka/pull/11721#issuecomment-1030278393 Thanks @ijuma , I believe I've sorted these out in [e891ebf](https://github.com/apache/kafka/pull/11721/commits/e891ebfd46ed97ab1c6face21f3d7f6565734a77). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799747533 ## File path: clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java ## @@ -239,6 +239,66 @@ public void testDouble() throws IOException { assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L); } +private static int mathSizeOfUnsignedVarint(int value) { +int leadingZeros = Integer.numberOfLeadingZeros(value); +// return (38 - leadingZeros) / 7 + leadingZeros / 32; +int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 0b10010010010010011) >>> 19; +return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5); +} + +@Test +public void testSizeOfUnsignedVarintMath() { +for (int i = 0; i < Integer.MAX_VALUE; i++) { Review comment: The tests I have now run in ~0.6 seconds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799748066 ## File path: clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java ## @@ -239,6 +239,66 @@ public void testDouble() throws IOException { assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L); } +private static int mathSizeOfUnsignedVarint(int value) { +int leadingZeros = Integer.numberOfLeadingZeros(value); +// return (38 - leadingZeros) / 7 + leadingZeros / 32; +int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 0b10010010010010011) >>> 19; +return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5); +} + +@Test +public void testSizeOfUnsignedVarintMath() { +for (int i = 0; i < Integer.MAX_VALUE; i++) { +final int actual = mathSizeOfUnsignedVarint(i); +final int expected = oldSizeOfUnsignedVarint(i); +assertEquals(expected, actual); +} +} + +/** + * The old well-known implementation for sizeOfUnsignedVarint + */ +private static int oldSizeOfUnsignedVarint(int value) { Review comment: I nested these inside the test case, which I think is cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11543: Update release.py
dajac commented on pull request #11543: URL: https://github.com/apache/kafka/pull/11543#issuecomment-1030295682 @shharrnam Thanks for the PR. Could you explain why we would need to add this new line? That does not seem necessary to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #11594: How to define the quantity of consumption groups
dajac closed pull request #11594: URL: https://github.com/apache/kafka/pull/11594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11594: How to define the quantity of consumption groups
dajac commented on pull request #11594: URL: https://github.com/apache/kafka/pull/11594#issuecomment-1030299168 @ayu-programer It depends on your use case. You could consume all the topics from the same group if the application needs to process all of them together. Otherwise, you can define a group per use case for instance and only subscribe to the required topics for that use case. I hope it helps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11353: KAFKA-13322: Reducing amount of garbage that gets generated during a poll operation
dajac commented on pull request #11353: URL: https://github.com/apache/kafka/pull/11353#issuecomment-1030306522 @mprusakov Are you still interested by doing this? I can help reviewing the PR once the existing comments are addressed. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #10562: MINOR: Update tests to include the 2.8.0 release
dajac commented on pull request #10562: URL: https://github.com/apache/kafka/pull/10562#issuecomment-1030315113 Closing as this was done by another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #10562: MINOR: Update tests to include the 2.8.0 release
dajac closed pull request #10562: URL: https://github.com/apache/kafka/pull/10562 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9963: MINOR: Extract ApiVersions logic from the `SocketServer` to the `KafkaApis`
dajac commented on pull request #9963: URL: https://github.com/apache/kafka/pull/9963#issuecomment-1030320056 Closing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #9963: MINOR: Extract ApiVersions logic from the `SocketServer` to the `KafkaApis`
dajac closed pull request #9963: URL: https://github.com/apache/kafka/pull/9963 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #9402: KAFKA-10588 update console consumer arguments for KIP-629
dajac closed pull request #9402: URL: https://github.com/apache/kafka/pull/9402 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9402: KAFKA-10588 update console consumer arguments for KIP-629
dajac commented on pull request #9402: URL: https://github.com/apache/kafka/pull/9402#issuecomment-1030324045 This was done by https://github.com/apache/kafka/pull/11008. Closing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9289: Dev/pasriva/json acl
dajac commented on pull request #9289: URL: https://github.com/apache/kafka/pull/9289#issuecomment-1030325227 I am sure what this PR is doing. It seems to be a mistake. Closing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #9289: Dev/pasriva/json acl
dajac closed pull request #9289: URL: https://github.com/apache/kafka/pull/9289 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #9085: MINOR: Support java.util.Optional in the auto-generated protocol
dajac closed pull request #9085: URL: https://github.com/apache/kafka/pull/9085 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9085: MINOR: Support java.util.Optional in the auto-generated protocol
dajac commented on pull request #9085: URL: https://github.com/apache/kafka/pull/9085#issuecomment-1030326392 PR is outdated. Closing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #7456: KAFKA-8997; Make Errors a first class type in the auto-generated protocol.
dajac closed pull request #7456: URL: https://github.com/apache/kafka/pull/7456 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
jasonk000 commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799796771 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long nowMs) { boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); + +final ProducerBatch batch; +final long waitedTimeMs; +final boolean backingOff; +final boolean full; + +// Collect as little as possible inside critical region, determine outcome after release synchronized (deque) { -// When producing to a large number of partitions, this path is hot and deques are often empty. -// We check whether a batch exists first to avoid the more expensive checks whenever possible. Review comment: I attempted to capture the sentiment with this comment, given the re-flowed logic: ``` // Collect as little as possible inside critical region, determine outcome after release ``` I'll add something more descriptive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mprusakov closed pull request #11353: KAFKA-13322: Reducing amount of garbage that gets generated during a poll operation
mprusakov closed pull request #11353: URL: https://github.com/apache/kafka/pull/11353 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mprusakov commented on pull request #11353: KAFKA-13322: Reducing amount of garbage that gets generated during a poll operation
mprusakov commented on pull request #11353: URL: https://github.com/apache/kafka/pull/11353#issuecomment-1030340572 Unfortunately i have realized that this fix is a drop in the ocean and further changes are much more intrusive. This change alone did not meet my client's requirements so I've implemented my own version using your lower level api thus no need in this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13322) Java client produces a large amount of garbage during a poll
[ https://issues.apache.org/jira/browse/KAFKA-13322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michail resolved KAFKA-13322. - Resolution: Won't Fix Having done the analysis the changes required to achieve this are far too intrusive thus decided not to proceed with this. > Java client produces a large amount of garbage during a poll > > > Key: KAFKA-13322 > URL: https://issues.apache.org/jira/browse/KAFKA-13322 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.0.0 >Reporter: Michail >Priority: Minor > Labels: new-rebalance-should-fix > > The java kafka consumer creates multiple collections during a single poll > command: in my test system i have a consumer that polls a topic with 100 > partitions and even though no messages are coming through, the code allocates > around 100M per 5 minutes. > > I've investigated the allocations and the biggest ones can be easily avoided > by moving them to the instance level, something that can be done as > KafkaConsumer is not thread safe. Purpose of this Jira is to get rid of most > of them applying either this or a similar approach. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] hachikuji commented on a change in pull request #11667: MINOR; Enable Kraft in ApiVersionTest
hachikuji commented on a change in pull request #11667: URL: https://github.com/apache/kafka/pull/11667#discussion_r799823851 ## File path: core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala ## @@ -17,41 +17,35 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.ClusterTest -import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource -@ExtendWith(value = Array(classOf[ClusterTestExtensions])) Review comment: I think `ClusterTestExtensions` was intended to test both modes. Was it not working? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #11685: Update dependencies.gradle
hachikuji commented on pull request #11685: URL: https://github.com/apache/kafka/pull/11685#issuecomment-1030368363 @shubhamsingh002 Can you clarify your intent here? The upgrade to log4j 2 is not compatible and we are not planning to release from 0.10 in any case as far as I know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
ijuma commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799828526 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long nowMs) { boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); + +final ProducerBatch batch; +final long waitedTimeMs; +final boolean backingOff; +final boolean full; + +// Collect as little as possible inside critical region, determine outcome after release synchronized (deque) { -// When producing to a large number of partitions, this path is hot and deques are often empty. -// We check whether a batch exists first to avoid the more expensive checks whenever possible. Review comment: I think the useful context that was removed is: > When producing to a large number of partitions, this path is hot and deques are often empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #11646: KAFKA-13566: producer exponential backoff implementation for KIP-580
hachikuji commented on pull request #11646: URL: https://github.com/apache/kafka/pull/11646#issuecomment-1030373051 @showuon Thanks for the PR and apologies for the delay. It looks to me like this is covering both the consumer and producer? Would it be possible to separate into two PRs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji closed pull request #11685: Update dependencies.gradle
hachikuji closed pull request #11685: URL: https://github.com/apache/kafka/pull/11685 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #11685: Update dependencies.gradle
hachikuji commented on pull request #11685: URL: https://github.com/apache/kafka/pull/11685#issuecomment-1030374498 I am going to go ahead and close this. Please feel free to reopen after you have updated the description to explain what you are trying to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11620: MINOR: check for raft threads in verifyNoUnexpectedThreads
hachikuji commented on a change in pull request #11620: URL: https://github.com/apache/kafka/pull/11620#discussion_r799835234 ## File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala ## @@ -340,6 +340,7 @@ abstract class QuorumTestHarness extends Logging { object QuorumTestHarness { val ZkClientEventThreadSuffix = "-EventThread" + val RaftClientThreadPrefix = "raft" Review comment: Was there a plan to use this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
ijuma commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799833387 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -413,15 +424,20 @@ public static int sizeOfVarint(int value) { * Number of bytes needed to encode a long in variable-length format. * * @param value The signed value + * @see #sizeOfUnsignedVarint(int) */ public static int sizeOfVarlong(long value) { long v = (value << 1) ^ (value >> 63); -int bytes = 1; -while ((v & 0xff80L) != 0L) { -bytes += 1; -v >>>= 7; -} -return bytes; + +// For implementation notes @see #sizeOfUnsignedVarint(int) + +// Similar logic is applied to allow for 64bit input -> 1-9byte output. + +// return (70 - leadingZeros) / 7 + leadingZeros / 64; Review comment: Nit: not sure we need the empty lines between comments here, seems pretty readable without it. ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.util; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.common.utils.ByteUtils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ByteUtilsBenchmark { +private int input; + +@Setup(Level.Iteration) +public void setUp() { +input = ThreadLocalRandom.current().nextInt(2 * 1024 * 1024); +} + +@Benchmark +@Fork(3) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 10, time = 1) +public int testSizeOfUnsignedVarint() { +return ByteUtils.sizeOfUnsignedVarint(input); +} + +@Benchmark +@Fork(3) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 10, time = 1) +public int testSizeOfUnsignedVarintSimple() { +int value = input; +int bytes = 1; +while ((value & 0xff80) != 0L) { +bytes += 1; +value >>>= 7; +} +return bytes; +} + +@Benchmark +@Fork(3) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 10, time = 1) +public int testSizeOfVarlong() { Review comment: I was going to ask why we're using the `test` prefix for a benchmark, but then I realized that many of the kafka benchmarks do that and I somehow didn't notice. :) Given that, it seems fine to leave it like this for now. ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.
[GitHub] [kafka] ijuma commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
ijuma commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r799833387 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -413,15 +424,20 @@ public static int sizeOfVarint(int value) { * Number of bytes needed to encode a long in variable-length format. * * @param value The signed value + * @see #sizeOfUnsignedVarint(int) */ public static int sizeOfVarlong(long value) { long v = (value << 1) ^ (value >> 63); -int bytes = 1; -while ((v & 0xff80L) != 0L) { -bytes += 1; -v >>>= 7; -} -return bytes; + +// For implementation notes @see #sizeOfUnsignedVarint(int) + +// Similar logic is applied to allow for 64bit input -> 1-9byte output. + +// return (70 - leadingZeros) / 7 + leadingZeros / 64; Review comment: Nit: not sure we need the empty lines between comments here, seems pretty readable without them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)
hachikuji commented on a change in pull request #11688: URL: https://github.com/apache/kafka/pull/11688#discussion_r799847085 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -696,8 +698,12 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut private RequestFuture onJoinLeader(JoinGroupResponse joinResponse) { try { // perform the leader synchronization and send back the assignment for the group -Map groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(), -joinResponse.data().members()); +Map groupAssignment = performAssignment( +joinResponse.data().leader(), +joinResponse.data().protocolName(), +joinResponse.data().members(), +joinResponse.data().skipAssignment() Review comment: I think a comment about this would be helpful. An obvious question is why do we still call `performAssignment` when `skipAssignment` is set. It's useful to remember that we still need to propagate the leader and member state to the coordinator implementation. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java ## @@ -211,7 +211,15 @@ protected void onJoinComplete(int generation, String memberId, String protocol, } @Override -protected Map performAssignment(String leaderId, String protocol, List allMemberMetadata) { +protected Map performAssignment(String leaderId, +String protocol, + List allMemberMetadata, +Boolean skipAssignment) { +// Connect does not support static membership so skipping the +// assignment should never happen in practice. +if (skipAssignment) +return Collections.emptyMap(); Review comment: Would it make sense to raise an exception instead? ## File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ## @@ -1791,4 +1793,33 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertTrue(records2.count() == 1 && records2.records(tp).asScala.head.offset == 1, "Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1") } + @Test Review comment: nit: add newline ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ## @@ -1550,6 +1573,34 @@ public void testMetadataChangeTriggersRebalance() { assertTrue(coordinator.rejoinNeededOrPending()); } +@Test +public void testStaticLeaderRejoinsGroupAndCanTriggersRebalance() { +// ensure metadata is up-to-date for leader +subscriptions.subscribe(singleton(topic1), rebalanceListener); +client.updateMetadata(metadataResponse); + +client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +// the leader is responsible for picking up metadata changes and forcing a group rebalance. +// note that `partitionAssignor.prepare` is not called therefore calling `partitionAssignor.assign` +// will throw a IllegalStateException. this indirectly verifies that `assign` is correctly skipped. +Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); Review comment: Could we have a case where the other consumers are subscribed to a topic that this consumer is not also subscribed to? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -642,6 +648,12 @@ private void maybeUpdateGroupSubscription(String assignorName, updateGroupSubscription(allSubscribedTopics); isLeader = true; +assignmentSnapshot = metadataSnapshot; + +if (skipAssignment) +return Collections.emptyMap(); Review comment: Can we add some logging for this case? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -198,11 +198,13 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, * @param leaderId The id of the leader (which is this member) * @param protocol The protocol selected by the coordinator * @param allMemberMetadata Metadata from all members of the group + * @param skipAssignment True if leader must skip running the assignor * @return A map from each member to their state assignment */ protected abstract Map performAssignment(String leaderId,
[GitHub] [kafka] lbradstreet commented on a change in pull request #11620: MINOR: check for raft threads in verifyNoUnexpectedThreads
lbradstreet commented on a change in pull request #11620: URL: https://github.com/apache/kafka/pull/11620#discussion_r799864255 ## File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala ## @@ -340,6 +340,7 @@ abstract class QuorumTestHarness extends Logging { object QuorumTestHarness { val ZkClientEventThreadSuffix = "-EventThread" + val RaftClientThreadPrefix = "raft" Review comment: I think I extracted this from another PR where I made a more substantial change. I'll figure out what I had in mind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #11734: MINOR: Do not use optional args in `ProducerStateManager`
hachikuji opened a new pull request #11734: URL: https://github.com/apache/kafka/pull/11734 We allowed `maxProducerIdExpirationMs` and `time` to be optional in the `ProducerStateManager` constructor. We generally frown on optional arguments since it is too easy to overlook them. In this case, I thought it was especially dangerous because `maxTransactionTimeoutMs` used the same type as `maxProducerIdExpirationMs`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #11694: MINOR: deleteHorizonMs update to documentation and DumpLogSegments tool
hachikuji merged pull request #11694: URL: https://github.com/apache/kafka/pull/11694 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
jasonk000 commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799904384 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long nowMs) { boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); + +final ProducerBatch batch; +final long waitedTimeMs; +final boolean backingOff; +final boolean full; + +// Collect as little as possible inside critical region, determine outcome after release synchronized (deque) { -// When producing to a large number of partitions, this path is hot and deques are often empty. -// We check whether a batch exists first to avoid the more expensive checks whenever possible. -ProducerBatch batch = deque.peekFirst(); -if (batch != null) { -TopicPartition part = entry.getKey(); -Node leader = cluster.leaderFor(part); -if (leader == null) { -// This is a partition for which leader is not known, but messages are available to send. -// Note that entries are currently not removed from batches when deque is empty. -unknownLeaderTopics.add(part.topic()); -} else if (!readyNodes.contains(leader) && !isMuted(part)) { -long waitedTimeMs = batch.waitedTimeMs(nowMs); -boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; -long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; -boolean full = deque.size() > 1 || batch.isFull(); -boolean expired = waitedTimeMs >= timeToWaitMs; -boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting(); -boolean sendable = full -|| expired -|| exhausted -|| closed -|| flushInProgress() -|| transactionCompleting; -if (sendable && !backingOff) { -readyNodes.add(leader); -} else { -long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); -// Note that this results in a conservative estimate since an un-sendable partition may have -// a leader that will later be found to have sendable data. However, this is good enough -// since we'll just wake up and then sleep again for the remaining time. -nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); -} -} +batch = deque.peekFirst(); Review comment: It is a small amount. The two changes (#11721) reduce CPU of the `send()` path by about 10% on a busy app, so the impact depends on the rest of the application. For this application, it is producing a high events/sec, so about 1.5-2% of overall system CPU is saved. For a 1000-machine cluster it will end up at ~15-20 machines. Before  After  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
jasonk000 commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r799905219 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long nowMs) { boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); + +final ProducerBatch batch; +final long waitedTimeMs; +final boolean backingOff; +final boolean full; + +// Collect as little as possible inside critical region, determine outcome after release synchronized (deque) { -// When producing to a large number of partitions, this path is hot and deques are often empty. -// We check whether a batch exists first to avoid the more expensive checks whenever possible. Review comment: [f15fdef](https://github.com/apache/kafka/pull/11722/commits/f15fdef3241a17f86af44c6dac04ba651d629426) Added: ``` // This loop is especially hot with large partition counts. // We are careful to only perform the minimum required inside the // synchronized block, as this lock is also used to synchronize producer threads // attempting to append() to a partition/batch. synchronized (deque) { // Deques are often empty in this path, esp with large partition counts, // so we exit early if we can. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata
[ https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12984: --- Fix Version/s: 2.5.2 2.6.2 2.7.1 > Cooperative sticky assignor can get stuck with invalid SubscriptionState > input metadata > --- > > Key: KAFKA-12984 > URL: https://issues.apache.org/jira/browse/KAFKA-12984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.5.2, 2.7.1, 2.6.2, 2.8.1, 3.0.0 > > Attachments: image-2021-10-25-11-53-40-221.png, > log-events-viewer-result-kafka.numbers, logs-insights-results-kafka.csv, > logs-insights-results-kafka.numbers > > > Some users have reported seeing their consumer group become stuck in the > CompletingRebalance phase when using the cooperative-sticky assignor. Based > on the request metadata we were able to deduce that multiple consumers were > reporting the same partition(s) in their "ownedPartitions" field of the > consumer protocol. Since this is an invalid state, the input causes the > cooperative-sticky assignor to detect that something is wrong and throw an > IllegalStateException. If the consumer application is set up to simply retry, > this will cause the group to appear to hang in the rebalance state. > The "ownedPartitions" field is encoded based on the ConsumerCoordinator's > SubscriptionState, which was assumed to always be up to date. However there > may be cases where the consumer has dropped out of the group but fails to > clear the SubscriptionState, allowing it to report some partitions as owned > that have since been reassigned to another member. > We should (a) fix the sticky assignment algorithm to resolve cases of > improper input conditions by invalidating the "ownedPartitions" in cases of > double ownership, and (b) shore up the ConsumerCoordinator logic to better > handle rejoining the group and keeping its internal state consistent. See > KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata
[ https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12984: --- Fix Version/s: 2.7.2 2.6.3 (was: 2.7.1) (was: 2.6.2) > Cooperative sticky assignor can get stuck with invalid SubscriptionState > input metadata > --- > > Key: KAFKA-12984 > URL: https://issues.apache.org/jira/browse/KAFKA-12984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.5.2, 2.6.3, 2.7.2, 2.8.1, 3.0.0 > > Attachments: image-2021-10-25-11-53-40-221.png, > log-events-viewer-result-kafka.numbers, logs-insights-results-kafka.csv, > logs-insights-results-kafka.numbers > > > Some users have reported seeing their consumer group become stuck in the > CompletingRebalance phase when using the cooperative-sticky assignor. Based > on the request metadata we were able to deduce that multiple consumers were > reporting the same partition(s) in their "ownedPartitions" field of the > consumer protocol. Since this is an invalid state, the input causes the > cooperative-sticky assignor to detect that something is wrong and throw an > IllegalStateException. If the consumer application is set up to simply retry, > this will cause the group to appear to hang in the rebalance state. > The "ownedPartitions" field is encoded based on the ConsumerCoordinator's > SubscriptionState, which was assumed to always be up to date. However there > may be cases where the consumer has dropped out of the group but fails to > clear the SubscriptionState, allowing it to report some partitions as owned > that have since been reassigned to another member. > We should (a) fix the sticky assignment algorithm to resolve cases of > improper input conditions by invalidating the "ownedPartitions" in cases of > double ownership, and (b) shore up the ConsumerCoordinator logic to better > handle rejoining the group and keeping its internal state consistent. See > KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ableegoldman commented on a change in pull request #11712: WIP: Put failed tasks to end of processing list
ableegoldman commented on a change in pull request #11712: URL: https://github.com/apache/kafka/pull/11712#discussion_r799919908 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -270,6 +278,23 @@ Task task(final TaskId taskId) { return readOnlyActiveTasks; } +List orderedActiveTasks() { +return Collections.unmodifiableList(orderedActiveTasks); +} + +void moveActiveTasksToTailFor(final String topologyName) { Review comment: I told Bruno to do this -- we can discuss next week if you still have questions, but the motivation is to avoid letting the tasks of a topology/query get way out of sync with each other. This can result in missed output for example when a processor upstream of one side of a join is paused for a long time while the other continues proccessing records and trying to join them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on pull request #11721: URL: https://github.com/apache/kafka/pull/11721#issuecomment-1030489854 I've addressed comments, I'll wait and see what spotbugs says this time. Locally, it's all clear, on CI it shows up issues with code not related to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13346) Kafka Streams fails due to RocksDB Locks Not Available Exception
[ https://issues.apache.org/jira/browse/KAFKA-13346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13346. --- Resolution: Not A Problem > Kafka Streams fails due to RocksDB Locks Not Available Exception > > > Key: KAFKA-13346 > URL: https://issues.apache.org/jira/browse/KAFKA-13346 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Amit Gupta >Priority: Major > > Hello, > We are using Kafka Streams and we observe that some times on some of the > hosts running streams application, Kafka streams instance fails with > unexpected exception. We are running with 40 stream threads per host and 20 > hosts in total. > Can some one please help on what can be the root cause here? > > |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store > state-store at location . > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:199) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:76) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:95) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:426) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:660) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > ~[kafka-streams-2.6.0.jar:?] > Caused by: org.rocksdb.RocksDBException: lock : > ./0_468/rocksdb/state-store/LOCK: No locks available > at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.18.3.jar:?] > at org.rocksdb.RocksDB.open(RocksDB.java:286) ~[rocksdbjni-5.18.3.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:211) > ~[kafka-streams-2.6.0.jar:?] > ... 15 more > > Some times I also see this exception > | > |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store > state-store at location ./0_433/rocksdb/state-store > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >
[jira] [Commented] (KAFKA-13346) Kafka Streams fails due to RocksDB Locks Not Available Exception
[ https://issues.apache.org/jira/browse/KAFKA-13346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487405#comment-17487405 ] Guozhang Wang commented on KAFKA-13346: --- I'm closing this ticket for now since we have not heard back from Amit. Please feel free to re-open it if there are new updates. > Kafka Streams fails due to RocksDB Locks Not Available Exception > > > Key: KAFKA-13346 > URL: https://issues.apache.org/jira/browse/KAFKA-13346 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Amit Gupta >Priority: Major > > Hello, > We are using Kafka Streams and we observe that some times on some of the > hosts running streams application, Kafka streams instance fails with > unexpected exception. We are running with 40 stream threads per host and 20 > hosts in total. > Can some one please help on what can be the root cause here? > > |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store > state-store at location . > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:199) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:76) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:95) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:426) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:660) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > ~[kafka-streams-2.6.0.jar:?] > Caused by: org.rocksdb.RocksDBException: lock : > ./0_468/rocksdb/state-store/LOCK: No locks available > at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.18.3.jar:?] > at org.rocksdb.RocksDB.open(RocksDB.java:286) ~[rocksdbjni-5.18.3.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:211) > ~[kafka-streams-2.6.0.jar:?] > ... 15 more > > Some times I also see this exception > | > |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store > state-store at location ./0_433/rocksdb/state-store > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101) > ~[kafk
[GitHub] [kafka] Kvicii commented on pull request #11543: Update release.py
Kvicii commented on pull request #11543: URL: https://github.com/apache/kafka/pull/11543#issuecomment-1030526684 @dajac This is unnecessary, I think this PR can be closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11586: KAFKA-13516: Connection level metrics are not closed
ijuma commented on pull request #11586: URL: https://github.com/apache/kafka/pull/11586#issuecomment-1030531283 Cc @apovzner @splett2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11579: KAFKA-13518: Update gson dependency
ijuma commented on pull request #11579: URL: https://github.com/apache/kafka/pull/11579#issuecomment-1030531381 Thanks for the PR. Seems like the new version has more false positives. Do you know if they intend to fix those? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma closed pull request #11543: Update release.py
ijuma closed pull request #11543: URL: https://github.com/apache/kafka/pull/11543 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11469: MINOR: disable zookeeper.sasl.client to avoid false error
ijuma commented on pull request #11469: URL: https://github.com/apache/kafka/pull/11469#issuecomment-1030532433 Any reason why this wasn't merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma closed pull request #11377: MINOR: Use try-with-resource to close the stream opened by Files.list()
ijuma closed pull request #11377: URL: https://github.com/apache/kafka/pull/11377 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion
ijuma commented on pull request #11376: URL: https://github.com/apache/kafka/pull/11376#issuecomment-1030532699 Are you planning to address the comments @lbradstreet ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11137: KAFKA-13133 Replace EasyMock and PowerMock with Mockito for AbstractHerderTest
ijuma commented on pull request #11137: URL: https://github.com/apache/kafka/pull/11137#issuecomment-1030532879 @wycc are you planning to address the comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11159: MINOR: Fix logging in ClusterControlManager
ijuma commented on pull request #11159: URL: https://github.com/apache/kafka/pull/11159#issuecomment-1030533071 @jsancio are you going to merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11161: MINOR: Remove node from API versions cache on NetworkClient.close(nodeId)
ijuma commented on pull request #11161: URL: https://github.com/apache/kafka/pull/11161#issuecomment-1030533145 @rajinisivaram friendly ping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11204: KAFKA-13188 - Release the memory back into MemoryPool
ijuma commented on a change in pull request #11204: URL: https://github.com/apache/kafka/pull/11204#discussion_r78166 ## File path: clients/src/main/java/org/apache/kafka/clients/ClientResponseWithFinalize.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients; + +import java.nio.ByteBuffer; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + + +/** + * This is a decorator for ClientResponse used to verify (at finalization time) that any underlying memory for this + * response has been returned to the pool. To be used only as a debugging aide. + */ +public class ClientResponseWithFinalize extends ClientResponse { +private final LogContext logContext; + +public ClientResponseWithFinalize(RequestHeader requestHeader, RequestCompletionHandler callback, +String destination, long createdTimeMs, long receivedTimeMs, boolean disconnected, +UnsupportedVersionException versionMismatch, AuthenticationException authenticationException, +AbstractResponse responseBody, MemoryPool memoryPool, ByteBuffer responsePayload, LogContext logContext) { +super(requestHeader, callback, destination, createdTimeMs, receivedTimeMs, disconnected, versionMismatch, +authenticationException, responseBody, memoryPool, responsePayload); +this.logContext = logContext; +} + +private Logger getLogger() { +if (logContext != null) { +return logContext.logger(ClientResponseWithFinalize.class); +} +return log; +} + +protected void checkAndForceBufferRelease() { +if (memoryPool != null && responsePayload != null) { +getLogger().error("ByteBuffer[{}] not released. Ref Count: {}. RequestType: {}", responsePayload.position(), +refCount.get(), this.requestHeader.apiKey()); +memoryPool.release(responsePayload); +responsePayload = null; +} +} + +@Override +protected void finalize() throws Throwable { +super.finalize(); +checkAndForceBufferRelease(); +} Review comment: Finalizers are deprecated, so it would be good to avoid them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11229: KAFKA-12961; Verify group generation in `DelayedJoin`
ijuma commented on pull request #11229: URL: https://github.com/apache/kafka/pull/11229#issuecomment-1030533521 @dajac is this still relevant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11238: MINOR: Fix force kill of KRaft colocated controllers in system tests
ijuma commented on pull request #11238: URL: https://github.com/apache/kafka/pull/11238#issuecomment-1030533632 Cc @jsancio @cmccabe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon closed pull request #11661: [WIP] MINOR: shutdown thread test
showuon closed pull request #11661: URL: https://github.com/apache/kafka/pull/11661 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11646: [WIP] KAFKA-13566: producer exponential backoff implementation for KIP-580
showuon commented on pull request #11646: URL: https://github.com/apache/kafka/pull/11646#issuecomment-1030567555 @hachikuji , thanks for the comment. Yes, actually I've separated producer and consumer into 2 PRs. (actually, there's 3rd one for adminClient). It's just there are still some changes shared between producer and consumer. Please wait for https://github.com/apache/kafka/pull/11627 get merged before reviewing this. I've marked it as `WIP`. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11712: WIP: Put failed tasks to end of processing list
ableegoldman commented on a change in pull request #11712: URL: https://github.com/apache/kafka/pull/11712#discussion_r800029186 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -270,6 +278,23 @@ Task task(final TaskId taskId) { return readOnlyActiveTasks; } +List orderedActiveTasks() { +return Collections.unmodifiableList(orderedActiveTasks); +} + +void moveActiveTasksToTailFor(final String topologyName) { +final List tasksToMove = new LinkedList<>(); +final Iterator iterator = orderedActiveTasks.iterator(); +while (iterator.hasNext()) { +final Task task = iterator.next(); +if (task.id().topologyName().equals(topologyName)) { +iterator.remove(); +tasksToMove.add(task); +} +} +orderedActiveTasks.addAll(tasksToMove); Review comment: > So we need to take the tasks that we know will fail and process all the other tasks without them There's definitely an implicit assumption here about the exception being (a) deterministic, and (b) correlated directly to some aspect of that specific task (eg de/serialization exception, NPE from input with null field, authorization failed on its topics) --and not a system error that happened to hit during that task's processing (eg RocksDBException: too many open files, out of memory, etc) Not saying we need to account for this in the first pass, I do think it's reasonable to assume that reprocessing the failed task will result in the same error since that's definitely true for what I suspect are the most common or large majority of errors: like the de/serialization or invalid timestamp errors, NPEs, etc. But it's worth keeping in mind especially when we roll this out and can get actual data on how reasonable these assumptions are On that note -- I should make sure to add some kind of logging that will allow us to count how often a failed task repeated the same error, or any kind of error. (Could even be a metric eventually?) In the mid-far future we should have some kind of finer-grained error classification implemented that we could lean on as a heuristic for whether to retry the task again immediately, backoff for a while, or even restart the runtime for fatal system errors (eg OOM) Anyways I'll file some tickets for all this in the V2+ milestone, just wanted to get us thinking about this sort of thing so we have some vision of the future optimized error handling mechanism to inform how we lay the groundwork now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs
showuon commented on a change in pull request #11691: URL: https://github.com/apache/kafka/pull/11691#discussion_r800029493 ## File path: docs/upgrade.html ## @@ -19,6 +19,13 @@
[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs
showuon commented on a change in pull request #11691: URL: https://github.com/apache/kafka/pull/11691#discussion_r800030072 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -514,12 +512,11 @@ public ProducerConfig(Map props) { } boolean idempotenceEnabled() { -boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG); boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG); -boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); - -if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions) +boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); +if (!idempotenceEnabled && userConfiguredTransactions) throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence."); + return userConfiguredTransactions || idempotenceEnabled; Review comment: You are right! After we default the `enable.idempotence` to true, there's no way to have `idempotenceEnabled==false` but `userConfiguredTransactions==true`. Remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed
dajac commented on a change in pull request #11586: URL: https://github.com/apache/kafka/pull/11586#discussion_r800030837 ## File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java ## @@ -1144,7 +1149,11 @@ public void close() { public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map metricTags, boolean metricsPerConnection) { this.metrics = metrics; this.metricTags = metricTags; -this.metricsPerConnection = metricsPerConnection; +if (metricsPerConnection) { +this.connectionMetrics = new ConcurrentHashMap<>(); Review comment: Do we really need a `ConcurrentHashMap` here? My understanding is that the selector should be used by a single thread. Isn't it the case here? ## File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java ## @@ -1305,44 +1314,62 @@ private Sensor sensor(String name, Sensor... parents) { } public void maybeRegisterConnectionMetrics(String connectionId) { -if (!connectionId.isEmpty() && metricsPerConnection) { -// if one sensor of the metrics has been registered for the connection, -// then all other sensors should have been registered; and vice versa -String nodeRequestName = "node-" + connectionId + ".requests-sent"; -Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); -if (nodeRequest == null) { -Map tags = new LinkedHashMap<>(metricTags); -tags.put("node-id", "node-" + connectionId); - -nodeRequest = sensor(nodeRequestName); -nodeRequest.add(createMeter(metrics, perConnectionMetricGrpName, tags, new WindowedCount(), "request", "requests sent")); -MetricName metricName = metrics.metricName("request-size-avg", perConnectionMetricGrpName, "The average size of requests sent.", tags); -nodeRequest.add(metricName, new Avg()); -metricName = metrics.metricName("request-size-max", perConnectionMetricGrpName, "The maximum size of any request sent.", tags); -nodeRequest.add(metricName, new Max()); - -String bytesSentName = "node-" + connectionId + ".bytes-sent"; -Sensor bytesSent = sensor(bytesSentName); -bytesSent.add(createMeter(metrics, perConnectionMetricGrpName, tags, "outgoing-byte", "outgoing bytes")); - -String nodeResponseName = "node-" + connectionId + ".responses-received"; -Sensor nodeResponse = sensor(nodeResponseName); -nodeResponse.add(createMeter(metrics, perConnectionMetricGrpName, tags, new WindowedCount(), "response", "responses received")); - -String bytesReceivedName = "node-" + connectionId + ".bytes-received"; -Sensor bytesReceive = sensor(bytesReceivedName); -bytesReceive.add(createMeter(metrics, perConnectionMetricGrpName, tags, "incoming-byte", "incoming bytes")); - -String nodeTimeName = "node-" + connectionId + ".latency"; -Sensor nodeRequestTime = sensor(nodeTimeName); -metricName = metrics.metricName("request-latency-avg", perConnectionMetricGrpName, tags); -nodeRequestTime.add(metricName, new Avg()); -metricName = metrics.metricName("request-latency-max", perConnectionMetricGrpName, tags); -nodeRequestTime.add(metricName, new Max()); -} +if (!connectionId.isEmpty() && connectionMetrics != null) { +connectionMetrics.computeIfAbsent(connectionId, (key) -> { +// key: connection id +// value: set of sensors (currently null) +return perConnectionSensors(key); +}); } } +public void maybeUnregisterConnectionMetrics(String connectionId) { +if (!connectionId.isEmpty() && connectionMetrics != null) { +connectionMetrics.computeIfPresent(connectionId, (key, value) -> { +// key: connection id +// value: set of sensors +for (Sensor sensor : value) { +metrics.removeSensor(sensor.name()); +} +return null; +}); +} +} + +private Set perConnectionSensors(String connectionId) { Review comment: Would it make sense to create a `ConnectionMetrics` class to hold all the connection metrics? That would give us an opportunity to improve all the `record*` methods as well. They could get the sensors based on the `connectionId`. -- This is an automated message from the Apa
[GitHub] [kafka] runom opened a new pull request #11735: MINOR: Check the help and version options firstly
runom opened a new pull request #11735: URL: https://github.com/apache/kafka/pull/11735 Currently, `bin/kafka-consumer-groups.sh --version` reqiures unnecessary `bootstrap-server` option. ``` % bin/kafka-consumer-groups.sh --version Missing required argument "[bootstrap-server]" Option Description -- --- --all-groupsApply to all consumer groups. --all-topicsConsider all topics assigned to a group in the `reset-offsets` process. ... --version Display Kafka version. ``` ``` % bin/kafka-consumer-groups.sh --version --bootstrap-server=localhost:9092 3.2.0-SNAPSHOT (Commit:21c3009ac12f79d0) ``` This PR fixes this problem. ``` % bin/kafka-consumer-groups.sh --version 3.2.0-SNAPSHOT (Commit:efe4bfd2c49997f7) ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org