Wenbing Shen created KAFKA-12493:
------------------------------------
Summary: The controller should handle the consistency between the
controllerContext and the partition replicas assignment on zookeeper
Key: KAFKA-12493
URL: https://issues.apache.org/jira/browse/KAFKA-12493
Project: Kafka
Issue Type: Bug
Components: controller
Affects Versions: 2.7.0, 2.6.0, 2.5.0, 2.4.0, 2.3.0, 2.2.0, 2.1.0, 2.0.0
Reporter: Wenbing Shen
Fix For: 3.0.0
This question can be linked to this email:
[https://lists.apache.org/thread.html/redf5748ec787a9c65fc48597e3d2256ffdd729de14afb873c63e6c5b%40%3Cusers.kafka.apache.org%3E]
This is a 100% recurring problem.
Problem description:
In the production environment of our customer’s site, the existing partitions
were redistributed in the code of colleagues in other departments and written
into zookeeper. This caused the controller to only judge the newly added
partitions when processing partition modification events. Partition allocation
plan and new partition and replica allocation in the partition state machine
and replica state machine, and issue LeaderAndISR and other control requests.
But the controller did not verify the existing partition replicas assigment in
the controllerContext and whether the original partition allocation on the
znode in zookeeper has changed. This seems to be no problem, but when we have
to restart the broker for some reasons, such as configuration updates and
upgrades Wait, this will cause this part of the topic in real-time production
to be abnormal, the controller cannot complete the allocation of the new
leader, and the original leader cannot correctly identify the replica allocated
on the current zookeeper. The real-time business in our customer's on-site
environment is interrupted and partially Data has been lost.
This problem can be stably reproduced in the following ways:
Adding partitions or modifying replicas of an existing topic through the
following code will cause the original partition replicas to be reallocated and
finally written to zookeeper.Next, the controller did not accurately process
this event, restart the topic related broker, this topic will not be able to be
produced and consumed.
{code:java}
public void updateKafkaTopic(KafkaTopicVO kafkaTopicVO) {
ZkUtils zkUtils = ZkUtils.apply(ZK_LIST, SESSION_TIMEOUT,
CONNECTION_TIMEOUT, JaasUtils.isZkSecurityEnabled());
try {
if (kafkaTopicVO.getPartitionNum() >= 0 &&
kafkaTopicVO.getReplicationNum() >= 0) {
// Get the original broker data information
Seq<BrokerMetadata> brokerMetadata =
AdminUtils.getBrokerMetadatas(zkUtils,
RackAwareMode.Enforced$.MODULE$,
Option.apply(null));
// Generate a new partition replica allocation plan
scala.collection.Map<Object, Seq<Object>> replicaAssign =
AdminUtils.assignReplicasToBrokers(brokerMetadata,
kafkaTopicVO.getPartitionNum(), // Number of partitions
kafkaTopicVO.getReplicationNum(), // Number of replicas per
partition
-1,
-1);
// Modify the partition replica allocation plan
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils,
kafkaTopicVO.getTopicNameList().get(0),
replicaAssign,
null,
true);
}
} catch (Exception e) {
System.out.println("Adjust partition abnormal");
System.exit(0);
} finally {
zkUtils.close();
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)