[ https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923379#comment-16923379 ]
Oleg Kuznetsov commented on KAFKA-8877: --------------------------------------- [~huxi_2b] Yes, you are right. But looks like the same problem has migrated to *StickyPartitionCache*: *indexCache* is ** better be populated/changed atomically {code:java} public int nextPartition(String topic, Cluster cluster, int prevPartition) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; // Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed. if (oldPart == null || oldPart == prevPartition) { List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() < 1) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = availablePartitions.get(0).partition(); } else { while (newPart == null || newPart.equals(oldPart)) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // Only change the sticky partition if it is null or prevPartition matches the current sticky partition. if (oldPart == null) { indexCache.putIfAbsent(topic, newPart); } else { indexCache.replace(topic, prevPartition, newPart); } return indexCache.get(topic); } return indexCache.get(topic); } {code} > Race condition on partition counter > ----------------------------------- > > Key: KAFKA-8877 > URL: https://issues.apache.org/jira/browse/KAFKA-8877 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 2.2.1 > Reporter: Oleg Kuznetsov > Priority: Major > > In the method: > *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* > {code:java} > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.get(topic); > if (null == counter) { > counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); > AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, > counter); > if (currentCounter != null) { > counter = currentCounter; > } > } > return counter.getAndIncrement(); > } > {code} > the counter might be created multiple times instead of once. > I propose to replace it with something like *topicCounterMap.compute(topic, _ > -> ...* (init the counter once per topic)) ** > -- This message was sent by Atlassian Jira (v8.3.2#803003)