Hi Kafka team,
I meet a strange thing about Kafka rebalance. If I increase partitions of a
topic which subscribed by some java consumers(in same one group), there is no
rebalance occur. Furthermore, if I start a new consumer (or stop one) to cause
a rebalance, the increased partitions could not be assigned, until I stop all
consumers and start them. Is that normal?
Thanks,
Ruiping Li
--------------------------------------------------------------------------------
Below is my test:
1. Start Kafka, ZK. Create a normal topic(test-topic) with 1 partitions
./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test-topic
--partitions 1 --replication-factor 1 --config retention.ms=604800000
2. Start 2 java consumers (C1, C2), subscribe test-topic
3. Increase 2 partitions of test-topic
rpli@rpli-mac:~/Softwares/Tools/kafka_2.11-1.0.0$ ./bin/kafka-topics.sh
--zookeeper 127.0.0.1:2181 --alter --topic test-topic --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition
logic or ordering of the messages will be affected
Adding partitions succeeded!
Increasing succeeded:
rpli@rpli-mac:~/Softwares/Tools/kafka_2.11-1.0.0$ ./bin/kafka-topics.sh
--zookeeper 127.0.0.1:2181 --describe --topic test-topic
Topic:test-topic PartitionCount:3 ReplicationFactor:1
Configs:retention.ms=604800000
Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
There is no rebalance occur in C1, C2.
4. Start a new consumer C3 to subscribed test-topic. Rebalance occur, but only
partition test-topic-0 involved in reassigned, no test-topic-1 and
test-topic-2.
5. I try to stop C2, C3, and test-topic-1 and test-topic-2 still not involved.
6. Stop all running consumers, and then start them. All test-topic-0,1,2
assigned normally.
Environment
kafka & java api version: kafka_2.12-2.0.0 (I also tried kafka_2.11-1.0.0 and
kafka_2.10-0.10.2.1, same result)
zookeeper: 3.4.13
consumer code:
// consumer
public class KafkaConsumerThread extends Thread {
// consumer settings
public static org.apache.kafka.clients.consumer.KafkaConsumer<String,
String> createNativeConsumer(String groupName, String kafkaBootstrap) {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrap);
props.put("group.id", groupName);
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", true);
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static final Logger log =
LoggerFactory.getLogger(KafkaConsumerThread.class);
private boolean stop = false;
private KafkaConsumer<String, String> consumer;
private String topicName;
private ConsumerRebalanceListener consumerRebalanceListener;
private AtomicLong receivedRecordNumber = new AtomicLong(0);
public KafkaConsumerThread(String topicName, String groupName,
ConsumerRebalanceListener consumerRebalanceListener, String kafkaBootstrap) {
this.consumer = createNativeConsumer(groupName, kafkaBootstrap);
this.topicName = topicName;
this.consumerRebalanceListener = consumerRebalanceListener;
}
@Override
public void run() {
log.info("Start consumer ..");
consumer.subscribe(Collections.singleton(topicName),
consumerRebalanceListener);
while (!stop) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
receivedRecordNumber.addAndGet(records.count());
Iterator<ConsumerRecord<String, String>> iterator =
records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
log.info("Receive [key:{}][value:{}]", record.key(),
record.value());
}
} catch (TimeoutException e) {
log.info("no data");
}
}
consumer.close();
}
public void stopConsumer() {
this.stop = true;
}
}