shizhengchao created FLINK-26033: ------------------------------------ Summary: In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect Key: FLINK-26033 URL: https://issues.apache.org/jira/browse/FLINK-26033 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.3, 1.13.3 Reporter: shizhengchao
In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect. Flink treats 'default' and 'round-robin' as the same strategy. {code:java} //代码占位符 public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner( ReadableConfig tableOptions, ClassLoader classLoader) { return tableOptions .getOptional(SINK_PARTITIONER) .flatMap( (String partitioner) -> { switch (partitioner) { case SINK_PARTITIONER_VALUE_FIXED: return Optional.of(new FlinkFixedPartitioner<>()); case SINK_PARTITIONER_VALUE_DEFAULT: case SINK_PARTITIONER_VALUE_ROUND_ROBIN: return Optional.empty(); // Default fallback to full class name of the partitioner. default: return Optional.of( initializePartitioner(partitioner, classLoader)); } }); } {code} They both use kafka's default partitioner, but the actual There are two scenarios for the partition on DefaultPartitioner: 1. Random when there is no key 2. When there is a key, take the modulo according to the key {code:java} // org.apache.kafka.clients.producer.internals.DefaultPartitioner public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { // Random when there is no key return stickyPartitionCache.partition(topic, cluster); } List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } {code} Therefore, KafkaConnector does not have a round-robin strategy.But we can borrow from kafka's RoundRobinPartitioner {code:java} //代码占位符 public class RoundRobinPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<String, ?> configs) {} /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { return new AtomicInteger(0); }); return counter.getAndIncrement(); } public void close() {} } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)