artemlivshits commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r865302205
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -960,8 +1002,10 @@ private Future<RecordMetadata> doSend(ProducerRecord<K,
V> record, Callback call
" to class " +
producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()
+
" specified in value.serializer", cce);
}
+
+ // Try to calculate partition, but note that after this call it
can be RecordMetadata.UNKNOWN_PARTITION,
+ // which means that the RecordAccumulator would pick a partition
based on broker load.
Review Comment:
Missed, now rephrased.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java:
##########
@@ -25,12 +23,15 @@
private final Cluster cluster;
private final Serializer<K> keySerializer;
- private final DefaultPartitioner defaultPartitioner;
+ @SuppressWarnings("deprecation")
+ private final
org.apache.kafka.clients.producer.internals.DefaultPartitioner
defaultPartitioner;
+
+ @SuppressWarnings("deprecation")
public DefaultStreamPartitioner(final Serializer<K> keySerializer, final
Cluster cluster) {
this.cluster = cluster;
this.keySerializer = keySerializer;
- this.defaultPartitioner = new DefaultPartitioner();
+ this.defaultPartitioner = new
org.apache.kafka.clients.producer.internals.DefaultPartitioner();
Review Comment:
DefaultPartitioner implements onNewBatch, but DefaultStreamPartitioner
doesn't seem to ever call it (the DefaultPartitioner is a private object).
Without onNewBatch, the DefaultPartitioner.partition would return the same
partition for unkeyed messages.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner. Note, that this is just a utility class that
is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for
adaptive sticky partitioning
+ * (described in detail in KIP-794). There is one partitioner object per
topic.
+ */
+public class BuiltInPartitioner {
+ private final Logger log;
+ private final String topic;
+ private final int stickyBatchSize;
+
+ private volatile PartitionLoadStats partitionLoadStats = null;
+ private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo =
new AtomicReference<>();
+
+ // Visible and used for testing only.
+ static volatile public Supplier<Integer> mockRandom = null;
+
+ /**
+ * BuiltInPartitioner constructor.
+ *
+ * @param topic The topic
+ * @param stickyBatchSize How much to produce to partition before switch
+ */
+ public BuiltInPartitioner(LogContext logContext, String topic, int
stickyBatchSize) {
+ this.log = logContext.logger(BuiltInPartitioner.class);
+ this.topic = topic;
+ this.stickyBatchSize = stickyBatchSize;
+ }
+
+ /**
+ * Calculate the next partition for the topic based on the partition load
stats.
+ */
+ private int nextPartition(Cluster cluster) {
+ int random = mockRandom != null ? mockRandom.get() :
Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+ // Cache volatile variable in local variable.
+ PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+ if (partitionLoadStats == null) {
+ // We don't have stats to do adaptive partitioning (or it's
disabled), just switch to the next
+ // partition based on uniform distribution.
+ List<PartitionInfo> availablePartitions =
cluster.availablePartitionsForTopic(topic);
+ if (availablePartitions.size() > 0)
+ return availablePartitions.get(random %
availablePartitions.size()).partition();
+
+ // We don't have available partitions, just pick one among all
partitions.
+ List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+ return random % partitions.size();
+ } else {
+ // Calculate next partition based on load distribution.
+ // Note that partitions without leader are excluded from the
partitionLoadStats.
+ assert partitionLoadStats.length > 0;
+
+ int[] cumulativeFrequencyTable =
partitionLoadStats.cumulativeFrequencyTable;
+ int weightedRandom = random %
cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+ // By construction, the cumulative frequency table is sorted, so
we can use binary
+ // search to find the desired index.
+ int searchResult = Arrays.binarySearch(cumulativeFrequencyTable,
0, partitionLoadStats.length, weightedRandom);
+
+ // binarySearch results the index of the found element, or
-(insertion_point) - 1
+ // (where insertion_point is the index of the first element
greater than the key).
+ // We need to get the index of the first value that is strictly
greater, which
+ // would be the insertion point, except if we found the element
that's equal to
+ // the searched value (in this case we need to get next). For
example, if we have
+ // 4 5 8
+ // and we're looking for 3, then we'd get the insertion_point = 0,
and the function
+ // would return -0 - 1 = -1, by adding 1 we'd get 0. If we're
looking for 4, we'd
+ // get 0, and we need the next one, so adding 1 works here as well.
+ int partitionIndex = Math.abs(searchResult + 1);
+ assert partitionIndex < partitionLoadStats.length;
+ return partitionLoadStats.partitionIds[partitionIndex];
+ }
+ }
+
+ /**
+ * Test-only function. When partition load stats are defined, return the
end of range for the
+ * random number.
+ */
+ public int loadStatsRangeEnd() {
+ assert partitionLoadStats != null;
+ assert partitionLoadStats.length > 0;
+ return
partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
+ }
+
+ /**
+ * Peek currently chosen sticky partition. This method works in
conjunction with {@link #isPartitionChanged}
+ * and {@link #updatePartitionInfo}. The workflow is the following:
+ *
+ * 1. peekCurrentPartitionInfo is called to know which partition to lock.
+ * 2. Lock partition's batch queue.
+ * 3. isPartitionChanged under lock to make sure that nobody raced us.
+ * 4. Append data to buffer.
+ * 5. updatePartitionInfo to update produced bytes and maybe switch
partition.
+ *
+ * It's important that steps 3-5 are under partition's batch queue lock.
+ *
+ * @param cluster The cluster information (needed if there is no current
partition)
+ * @return sticky partition info object
+ */
+ StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
+ StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+ if (partitionInfo != null)
+ return partitionInfo;
+
+ // We're the first to create it.
+ int partition = nextPartition(cluster);
+ log.trace("Switching to partition {} in topic {}", partition, topic);
+ partitionInfo = new StickyPartitionInfo(partition);
+ if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
+ return partitionInfo;
+
+ // Someone has raced us.
+ return stickyPartitionInfo.get();
+ }
+
+ /**
+ * Check if partition is changed by a concurrent thread. NOTE this
function needs to be called under
+ * the partition's batch queue lock.
+ *
+ * @param partitionInfo The sticky partition info object returned by
peekCurrentPartitionInfo
+ * @return true if sticky partition object is changed (race condition)
+ */
+ boolean isPartitionChanged(StickyPartitionInfo partitionInfo) {
+ // partitionInfo may be null if the caller didn't use built-in
partitioner.
+ return partitionInfo != null && stickyPartitionInfo.get() !=
partitionInfo;
+ }
+
+ /**
+ * Update partition info with the number of bytes appended and maybe
switch partition.
+ * NOTE this function needs to be called under the partition's batch queue
lock.
+ *
+ * @param partitionInfo The sticky partition info object returned by
peekCurrentPartitionInfo
+ * @param appendedBytes The number of bytes appended to this partition
+ * @param cluster The cluster information
+ */
+ void updatePartitionInfo(StickyPartitionInfo partitionInfo, int
appendedBytes, Cluster cluster) {
+ // partitionInfo may be null if the caller didn't use built-in
partitioner.
+ if (partitionInfo == null)
+ return;
+
+ assert partitionInfo == stickyPartitionInfo.get();
+ int producedBytes =
partitionInfo.producedBytes.addAndGet(appendedBytes);
+ if (producedBytes >= stickyBatchSize) {
+ // We've produced enough to this partition, switch to next.
+ int partition = nextPartition(cluster);
+ log.trace("Switching to partition {} in topic {}", partition,
topic);
+ StickyPartitionInfo newPartitionInfo = new
StickyPartitionInfo(partition);
+ stickyPartitionInfo.set(newPartitionInfo);
+ }
+ }
+
+ /**
+ * Update partition load stats from the queue sizes of each partition.
Review Comment:
Added comments in params.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner. Note, that this is just a utility class that
is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for
adaptive sticky partitioning
+ * (described in detail in KIP-794). There is one partitioner object per
topic.
+ */
+public class BuiltInPartitioner {
+ private final Logger log;
+ private final String topic;
+ private final int stickyBatchSize;
+
+ private volatile PartitionLoadStats partitionLoadStats = null;
+ private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo =
new AtomicReference<>();
+
+ // Visible and used for testing only.
+ static volatile public Supplier<Integer> mockRandom = null;
+
+ /**
+ * BuiltInPartitioner constructor.
+ *
+ * @param topic The topic
+ * @param stickyBatchSize How much to produce to partition before switch
+ */
+ public BuiltInPartitioner(LogContext logContext, String topic, int
stickyBatchSize) {
+ this.log = logContext.logger(BuiltInPartitioner.class);
+ this.topic = topic;
+ this.stickyBatchSize = stickyBatchSize;
+ }
+
+ /**
+ * Calculate the next partition for the topic based on the partition load
stats.
+ */
+ private int nextPartition(Cluster cluster) {
+ int random = mockRandom != null ? mockRandom.get() :
Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+ // Cache volatile variable in local variable.
+ PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+ if (partitionLoadStats == null) {
+ // We don't have stats to do adaptive partitioning (or it's
disabled), just switch to the next
+ // partition based on uniform distribution.
+ List<PartitionInfo> availablePartitions =
cluster.availablePartitionsForTopic(topic);
+ if (availablePartitions.size() > 0)
+ return availablePartitions.get(random %
availablePartitions.size()).partition();
+
+ // We don't have available partitions, just pick one among all
partitions.
+ List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+ return random % partitions.size();
+ } else {
+ // Calculate next partition based on load distribution.
+ // Note that partitions without leader are excluded from the
partitionLoadStats.
+ assert partitionLoadStats.length > 0;
+
+ int[] cumulativeFrequencyTable =
partitionLoadStats.cumulativeFrequencyTable;
+ int weightedRandom = random %
cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+ // By construction, the cumulative frequency table is sorted, so
we can use binary
+ // search to find the desired index.
+ int searchResult = Arrays.binarySearch(cumulativeFrequencyTable,
0, partitionLoadStats.length, weightedRandom);
+
+ // binarySearch results the index of the found element, or
-(insertion_point) - 1
+ // (where insertion_point is the index of the first element
greater than the key).
+ // We need to get the index of the first value that is strictly
greater, which
+ // would be the insertion point, except if we found the element
that's equal to
+ // the searched value (in this case we need to get next). For
example, if we have
+ // 4 5 8
+ // and we're looking for 3, then we'd get the insertion_point = 0,
and the function
+ // would return -0 - 1 = -1, by adding 1 we'd get 0. If we're
looking for 4, we'd
+ // get 0, and we need the next one, so adding 1 works here as well.
+ int partitionIndex = Math.abs(searchResult + 1);
+ assert partitionIndex < partitionLoadStats.length;
+ return partitionLoadStats.partitionIds[partitionIndex];
+ }
+ }
+
+ /**
+ * Test-only function. When partition load stats are defined, return the
end of range for the
+ * random number.
+ */
+ public int loadStatsRangeEnd() {
+ assert partitionLoadStats != null;
+ assert partitionLoadStats.length > 0;
+ return
partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
+ }
+
+ /**
+ * Peek currently chosen sticky partition. This method works in
conjunction with {@link #isPartitionChanged}
+ * and {@link #updatePartitionInfo}. The workflow is the following:
+ *
+ * 1. peekCurrentPartitionInfo is called to know which partition to lock.
+ * 2. Lock partition's batch queue.
+ * 3. isPartitionChanged under lock to make sure that nobody raced us.
+ * 4. Append data to buffer.
+ * 5. updatePartitionInfo to update produced bytes and maybe switch
partition.
+ *
+ * It's important that steps 3-5 are under partition's batch queue lock.
+ *
+ * @param cluster The cluster information (needed if there is no current
partition)
+ * @return sticky partition info object
+ */
+ StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
+ StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+ if (partitionInfo != null)
+ return partitionInfo;
+
+ // We're the first to create it.
+ int partition = nextPartition(cluster);
+ log.trace("Switching to partition {} in topic {}", partition, topic);
+ partitionInfo = new StickyPartitionInfo(partition);
+ if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
+ return partitionInfo;
+
+ // Someone has raced us.
+ return stickyPartitionInfo.get();
+ }
+
+ /**
+ * Check if partition is changed by a concurrent thread. NOTE this
function needs to be called under
+ * the partition's batch queue lock.
+ *
+ * @param partitionInfo The sticky partition info object returned by
peekCurrentPartitionInfo
+ * @return true if sticky partition object is changed (race condition)
+ */
+ boolean isPartitionChanged(StickyPartitionInfo partitionInfo) {
+ // partitionInfo may be null if the caller didn't use built-in
partitioner.
+ return partitionInfo != null && stickyPartitionInfo.get() !=
partitionInfo;
+ }
+
+ /**
+ * Update partition info with the number of bytes appended and maybe
switch partition.
+ * NOTE this function needs to be called under the partition's batch queue
lock.
+ *
+ * @param partitionInfo The sticky partition info object returned by
peekCurrentPartitionInfo
+ * @param appendedBytes The number of bytes appended to this partition
+ * @param cluster The cluster information
+ */
+ void updatePartitionInfo(StickyPartitionInfo partitionInfo, int
appendedBytes, Cluster cluster) {
+ // partitionInfo may be null if the caller didn't use built-in
partitioner.
+ if (partitionInfo == null)
+ return;
+
+ assert partitionInfo == stickyPartitionInfo.get();
+ int producedBytes =
partitionInfo.producedBytes.addAndGet(appendedBytes);
+ if (producedBytes >= stickyBatchSize) {
+ // We've produced enough to this partition, switch to next.
+ int partition = nextPartition(cluster);
+ log.trace("Switching to partition {} in topic {}", partition,
topic);
+ StickyPartitionInfo newPartitionInfo = new
StickyPartitionInfo(partition);
+ stickyPartitionInfo.set(newPartitionInfo);
+ }
+ }
+
+ /**
+ * Update partition load stats from the queue sizes of each partition.
+ * NOTE: queueSizes are modified in place to avoid allocations
+ *
+ * @param queueSizes The queue sizes
+ * @param partitionIds The partition ids for the queues
+ * @param length The logical length of the arrays (could be less): we may
eliminate some partitions
+ * based on latency, but to avoid reallocation of the
arrays, we just decrement
+ * logical length
+ * Visible for testing
+ */
+ public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds,
int length) {
+ if (queueSizes == null) {
+ log.trace("No load stats for topic {}, not using adaptive", topic);
+ partitionLoadStats = null;
+ return;
+ }
+ assert queueSizes.length == partitionIds.length;
+ assert length <= queueSizes.length;
+
+ // The queueSizes.length represents the number of all partitions in
the topic and if we have
+ // less than 2 partitions, there is no need to do adaptive logic.
+ // If partitioner.availability.timeout.ms != 0, then partitions that
experience high latencies
+ // (greater than partitioner.availability.timeout.ms) may be excluded,
the length represents
+ // partitions that are not excluded. If some partitions were
excluded, we'd still want to
+ // go through adaptive logic, even if we have one partition.
+ // See also RecordAccumulator#partitionReady where the queueSizes are
built.
+ if (length < 1 || queueSizes.length < 2) {
+ log.trace("The number of partitions is too small: {} and {}, not
using adaptive for topic {}",
Review Comment:
Added.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner. Note, that this is just a utility class that
is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for
adaptive sticky partitioning
+ * (described in detail in KIP-794). There is one partitioner object per
topic.
+ */
+public class BuiltInPartitioner {
+ private final Logger log;
+ private final String topic;
+ private final int stickyBatchSize;
+
+ private volatile PartitionLoadStats partitionLoadStats = null;
+ private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo =
new AtomicReference<>();
+
+ // Visible and used for testing only.
+ static volatile public Supplier<Integer> mockRandom = null;
+
+ /**
+ * BuiltInPartitioner constructor.
+ *
+ * @param topic The topic
+ * @param stickyBatchSize How much to produce to partition before switch
+ */
+ public BuiltInPartitioner(LogContext logContext, String topic, int
stickyBatchSize) {
+ this.log = logContext.logger(BuiltInPartitioner.class);
+ this.topic = topic;
+ this.stickyBatchSize = stickyBatchSize;
+ }
+
+ /**
+ * Calculate the next partition for the topic based on the partition load
stats.
+ */
+ private int nextPartition(Cluster cluster) {
+ int random = mockRandom != null ? mockRandom.get() :
Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+ // Cache volatile variable in local variable.
+ PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+ if (partitionLoadStats == null) {
+ // We don't have stats to do adaptive partitioning (or it's
disabled), just switch to the next
+ // partition based on uniform distribution.
+ List<PartitionInfo> availablePartitions =
cluster.availablePartitionsForTopic(topic);
+ if (availablePartitions.size() > 0)
+ return availablePartitions.get(random %
availablePartitions.size()).partition();
+
+ // We don't have available partitions, just pick one among all
partitions.
+ List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+ return random % partitions.size();
+ } else {
+ // Calculate next partition based on load distribution.
+ // Note that partitions without leader are excluded from the
partitionLoadStats.
+ assert partitionLoadStats.length > 0;
+
+ int[] cumulativeFrequencyTable =
partitionLoadStats.cumulativeFrequencyTable;
+ int weightedRandom = random %
cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+ // By construction, the cumulative frequency table is sorted, so
we can use binary
+ // search to find the desired index.
+ int searchResult = Arrays.binarySearch(cumulativeFrequencyTable,
0, partitionLoadStats.length, weightedRandom);
+
+ // binarySearch results the index of the found element, or
-(insertion_point) - 1
+ // (where insertion_point is the index of the first element
greater than the key).
+ // We need to get the index of the first value that is strictly
greater, which
+ // would be the insertion point, except if we found the element
that's equal to
+ // the searched value (in this case we need to get next). For
example, if we have
+ // 4 5 8
+ // and we're looking for 3, then we'd get the insertion_point = 0,
and the function
+ // would return -0 - 1 = -1, by adding 1 we'd get 0. If we're
looking for 4, we'd
+ // get 0, and we need the next one, so adding 1 works here as well.
+ int partitionIndex = Math.abs(searchResult + 1);
+ assert partitionIndex < partitionLoadStats.length;
+ return partitionLoadStats.partitionIds[partitionIndex];
+ }
+ }
+
+ /**
+ * Test-only function. When partition load stats are defined, return the
end of range for the
+ * random number.
+ */
+ public int loadStatsRangeEnd() {
+ assert partitionLoadStats != null;
+ assert partitionLoadStats.length > 0;
+ return
partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
+ }
+
+ /**
+ * Peek currently chosen sticky partition. This method works in
conjunction with {@link #isPartitionChanged}
+ * and {@link #updatePartitionInfo}. The workflow is the following:
+ *
+ * 1. peekCurrentPartitionInfo is called to know which partition to lock.
+ * 2. Lock partition's batch queue.
+ * 3. isPartitionChanged under lock to make sure that nobody raced us.
+ * 4. Append data to buffer.
+ * 5. updatePartitionInfo to update produced bytes and maybe switch
partition.
+ *
+ * It's important that steps 3-5 are under partition's batch queue lock.
+ *
+ * @param cluster The cluster information (needed if there is no current
partition)
+ * @return sticky partition info object
+ */
+ StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
+ StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+ if (partitionInfo != null)
+ return partitionInfo;
+
+ // We're the first to create it.
+ int partition = nextPartition(cluster);
+ log.trace("Switching to partition {} in topic {}", partition, topic);
Review Comment:
Done.
##########
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java:
##########
@@ -117,10 +116,24 @@ public MockProducer(final Cluster cluster,
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner,
Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new
DefaultPartitioner(), keySerializer, valueSerializer)}
*/
+ @SuppressWarnings("deprecation")
public MockProducer(final boolean autoComplete,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
- this(Cluster.empty(), autoComplete, new DefaultPartitioner(),
keySerializer, valueSerializer);
+ this(Cluster.empty(), autoComplete, new
org.apache.kafka.clients.producer.internals.DefaultPartitioner(),
keySerializer, valueSerializer);
Review Comment:
DefaultPartitioner implements onNewBatch, but MockProducer doesn't call it.
It only calls .partition, which for unkeyed messages returns the same partition
until the onNewBatch is called.
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1017,6 +1024,158 @@ public void testStickyBatches() throws Exception {
assertEquals(appends, 2 * expectedAppends);
}
+ @Test
+ public void testUniformBuiltInPartitioner() throws Exception {
+
+ try {
+ // Mock random number generator with just sequential integer.
+ AtomicInteger mockRandom = new AtomicInteger();
+ BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);
+
+ long totalSize = 1024 * 1024;
+ int batchSize = 128; // note that this is also a "sticky" limit
for the partitioner
+ RecordAccumulator accum = createTestRecordAccumulator(batchSize,
totalSize, CompressionType.NONE, 0);
+
+ // Set up callbacks so that we know what partition is chosen.
+ final int[] partition = {RecordMetadata.UNKNOWN_PARTITION};
Review Comment:
Sure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]