[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17494438#comment-17494438 ]
Martijn Visser commented on FLINK-26033: ---------------------------------------- [~tinny] I've assigned it to you. I'm sure that [~renqs] can help with a review. > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -------------------------------------------------------------------------------------------------- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 > Reporter: shizhengchao > Assignee: shizhengchao > Priority: Major > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional<FlinkKafkaPartitioner<RowData>> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map<String, ?> configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List<PartitionInfo> availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { > return new AtomicInteger(0); > }); > return counter.getAndIncrement(); > } > public void close() {} > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)