[ https://issues.apache.org/jira/browse/KAFKA-13538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Haoze Wu updated KAFKA-13538: ----------------------------- Attachment: Screenshot from 2021-12-12 16-33-11.png Screenshot from 2021-12-12 16-33-06.png Description: We were using the official Kafka Java API to create a topic in a Kafka broker cluster (3 brokers): {code:java} CreateTopicsResult result = admin.createTopics(...); ... = result.all().get(); {code} The topic we create always has replication factor = 2, and partition = 2. If one of the brokers crashes for some reason and the client tries to create a topic exactly in this crashed broker, we usually observe that the client may suffer from a delay of a few seconds due to the disconnection issue, and then the client automatically connects to another broker and creates the topic in this broker. Everything is done automatically in the client, under the code of `admin.createTopics(...)` and `result.all().get()`. However, we found that sometimes we got `TopicExistsException` from `result.all().get()`, but we had never created this topic beforehand. After some investigation on the source code of client, we found that this issue happens in this way: # The client connects to a broker (say, broker X) and then sends the topic creation request. # This topic has replication factor = 2 and partition = 2, so broker X may inform another broker of this information. # Broker X suddenly crashes for some reason, and the response for the topic creation request has not been sent back to the client. # The client eventually learns that broker X crashes, but never gets the response for the topic creation request. Thus the client thinks the topic creation request fails, and thus connects to another broker (say, broker Y) and then sends the topic creation request again. # This topic creation request (with replication factor = 2 and partition = 2) had been partially executed before broker X crashes, so broker Y may have done something required by broker X. For example, broker Y has some metadata about this topic. Therefore, when Broker Y does some sanity check with the metadata, it will find this topic exists, so broker Y directly returns `TopicExistsException` as the response. # The client receives `TopicExistsException`, and directly believes that this topic has been created, so it is thrown back to the user with the API `result.all().get()`. There are 2 diagrams illustrating these 6 steps: !Screenshot from 2021-12-12 16-33-06.png! !Screenshot from 2021-12-12 16-33-11.png! Now the core question is whether this workflow violates the semantic & design of the Kafka Client API. We read the “Create Topics Response” section in KIP-4 ([https://cwiki.apache.org/confluence/display/kafka/kip-4+-+command+line+and+centralized+administrative+operations]). We found that the description in KIP-4 focuses on the batch request of topic creations and how they work independently. It does not talk about how the client should deal with the aforementioned buggy scenario. According to “common sense”, we think the client should be able to know that the metadata existing in broker Y is actually created by the client via the crashed broker X. Also, the client should not throw `TopicExistsException` to the user. was: We were using the official Kafka Java API to create a topic in a Kafka broker cluster (3 brokers): {code:java} CreateTopicsResult result = admin.createTopics(...); ... = result.all().get(); {code} The topic we create always has replication factor = 2, and partition = 2. If one of the brokers crashes for some reason and the client tries to create a topic exactly in this crashed broker, we usually observe that the client may suffer from a delay of a few seconds due to the disconnection issue, and then the client automatically connects to another broker and creates the topic in this broker. Everything is done automatically in the client, under the code of `admin.createTopics(...)` and `result.all().get()`. However, we found that sometimes we got `TopicExistsException` from `result.all().get()`, but we had never created this topic beforehand. After some investigation on the source code of client, we found that this issue happens in this way: # The client connects to a broker (say, broker X) and then sends the topic creation request. # This topic has replication factor = 2 and partition = 2, so broker X may inform another broker of this information. # Broker X suddenly crashes for some reason, and the response for the topic creation request has not been sent back to the client. # The client eventually learns that broker X crashes, but never gets the response for the topic creation request. Thus the client thinks the topic creation request fails, and thus connects to another broker (say, broker Y) and then sends the topic creation request again. # This topic creation request (with replication factor = 2 and partition = 2) had been partially executed before broker X crashes, so broker Y may have done something required by broker X. For example, broker Y has some metadata about this topic. Therefore, when Broker Y does some sanity check with the metadata, it will find this topic exists, so broker Y directly returns `TopicExistsException` as the response. # The client receives `TopicExistsException`, and directly believes that this topic has been created, so it is thrown back to the user with the API `result.all().get()`. There are 2 diagrams illustrating these 6 steps: Now the core question is whether this workflow violates the semantic & design of the Kafka Client API. We read the “Create Topics Response” section in KIP-4 ([https://cwiki.apache.org/confluence/display/kafka/kip-4+-+command+line+and+centralized+administrative+operations]). We found that the description in KIP-4 focuses on the batch request of topic creations and how they work independently. It does not talk about how the client should deal with the aforementioned buggy scenario. According to “common sense”, we think the client should be able to know that the metadata existing in broker Y is actually created by the client via the crashed broker X. Also, the client should not throw `TopicExistsException` to the user. > Unexpected TopicExistsException related to Admin#createTopics after broker > crash > -------------------------------------------------------------------------------- > > Key: KAFKA-13538 > URL: https://issues.apache.org/jira/browse/KAFKA-13538 > Project: Kafka > Issue Type: Bug > Components: build > Affects Versions: 2.8.0 > Reporter: Haoze Wu > Priority: Major > Attachments: Screenshot from 2021-12-12 16-33-06.png, Screenshot from > 2021-12-12 16-33-11.png > > > We were using the official Kafka Java API to create a topic in a Kafka broker > cluster (3 brokers): > {code:java} > CreateTopicsResult result = admin.createTopics(...); > ... = result.all().get(); {code} > The topic we create always has replication factor = 2, and partition = 2. If > one of the brokers crashes for some reason and the client tries to create a > topic exactly in this crashed broker, we usually observe that the client may > suffer from a delay of a few seconds due to the disconnection issue, and then > the client automatically connects to another broker and creates the topic in > this broker. Everything is done automatically in the client, under the code > of `admin.createTopics(...)` and `result.all().get()`. > However, we found that sometimes we got `TopicExistsException` from > `result.all().get()`, but we had never created this topic beforehand. > After some investigation on the source code of client, we found that this > issue happens in this way: > # The client connects to a broker (say, broker X) and then sends the topic > creation request. > # This topic has replication factor = 2 and partition = 2, so broker X may > inform another broker of this information. > # Broker X suddenly crashes for some reason, and the response for the topic > creation request has not been sent back to the client. > # The client eventually learns that broker X crashes, but never gets the > response for the topic creation request. Thus the client thinks the topic > creation request fails, and thus connects to another broker (say, broker Y) > and then sends the topic creation request again. > # This topic creation request (with replication factor = 2 and partition = > 2) had been partially executed before broker X crashes, so broker Y may have > done something required by broker X. For example, broker Y has some metadata > about this topic. Therefore, when Broker Y does some sanity check with the > metadata, it will find this topic exists, so broker Y directly returns > `TopicExistsException` as the response. > # The client receives `TopicExistsException`, and directly believes that > this topic has been created, so it is thrown back to the user with the API > `result.all().get()`. > There are 2 diagrams illustrating these 6 steps: > !Screenshot from 2021-12-12 16-33-06.png! > !Screenshot from 2021-12-12 16-33-11.png! > Now the core question is whether this workflow violates the semantic & design > of the Kafka Client API. We read the “Create Topics Response” section in > KIP-4 > ([https://cwiki.apache.org/confluence/display/kafka/kip-4+-+command+line+and+centralized+administrative+operations]). > We found that the description in KIP-4 focuses on the batch request of topic > creations and how they work independently. It does not talk about how the > client should deal with the aforementioned buggy scenario. > According to “common sense”, we think the client should be able to know that > the metadata existing in broker Y is actually created by the client via the > crashed broker X. Also, the client should not throw `TopicExistsException` to > the user. -- This message was sent by Atlassian Jira (v8.20.1#820001)