Hi Kafka team,
I meet a strange thing about Kafka rebalance. If I increase partitions of a topic which subscribed by some java consumers(in same one group), there is no rebalance occur. Furthermore, if I start a new consumer (or stop one) to cause a rebalance, the increased partitions could not be assigned, until I stop all consumers and start them. Is that normal? Thanks, Ruiping Li -------------------------------------------------------------------------------- Below is my test: 1. Start Kafka, ZK. Create a normal topic(test-topic) with 1 partitions ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test-topic --partitions 1 --replication-factor 1 --config retention.ms=604800000 2. Start 2 java consumers (C1, C2), subscribe test-topic 3. Increase 2 partitions of test-topic rpli@rpli-mac:~/Softwares/Tools/kafka_2.11-1.0.0$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test-topic --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! Increasing succeeded: rpli@rpli-mac:~/Softwares/Tools/kafka_2.11-1.0.0$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test-topic Topic:test-topic PartitionCount:3 ReplicationFactor:1 Configs:retention.ms=604800000 Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: test-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0 There is no rebalance occur in C1, C2. 4. Start a new consumer C3 to subscribed test-topic. Rebalance occur, but only partition test-topic-0 involved in reassigned, no test-topic-1 and test-topic-2. 5. I try to stop C2, C3, and test-topic-1 and test-topic-2 still not involved. 6. Stop all running consumers, and then start them. All test-topic-0,1,2 assigned normally. Environment kafka & java api version: kafka_2.12-2.0.0 (I also tried kafka_2.11-1.0.0 and kafka_2.10-0.10.2.1, same result) zookeeper: 3.4.13 consumer code: // consumer public class KafkaConsumerThread extends Thread { // consumer settings public static org.apache.kafka.clients.consumer.KafkaConsumer<String, String> createNativeConsumer(String groupName, String kafkaBootstrap) { Properties props = new Properties(); props.put("bootstrap.servers", kafkaBootstrap); props.put("group.id", groupName); props.put("auto.offset.reset", "earliest"); props.put("enable.auto.commit", true); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return new KafkaConsumer<String, String>(props); } private static final Logger log = LoggerFactory.getLogger(KafkaConsumerThread.class); private boolean stop = false; private KafkaConsumer<String, String> consumer; private String topicName; private ConsumerRebalanceListener consumerRebalanceListener; private AtomicLong receivedRecordNumber = new AtomicLong(0); public KafkaConsumerThread(String topicName, String groupName, ConsumerRebalanceListener consumerRebalanceListener, String kafkaBootstrap) { this.consumer = createNativeConsumer(groupName, kafkaBootstrap); this.topicName = topicName; this.consumerRebalanceListener = consumerRebalanceListener; } @Override public void run() { log.info("Start consumer .."); consumer.subscribe(Collections.singleton(topicName), consumerRebalanceListener); while (!stop) { try { ConsumerRecords<String, String> records = consumer.poll(100); receivedRecordNumber.addAndGet(records.count()); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); log.info("Receive [key:{}][value:{}]", record.key(), record.value()); } } catch (TimeoutException e) { log.info("no data"); } } consumer.close(); } public void stopConsumer() { this.stop = true; } }