[ https://issues.apache.org/jira/browse/KAFKA-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450618#comment-17450618 ]
Justine Olshan commented on KAFKA-13488: ---------------------------------------- To clarify, did KAFKA-12257 make this issue worse, or just didn't fix it all the way? > Producer fails to recover if topic gets deleted (and gets auto-created) > ----------------------------------------------------------------------- > > Key: KAFKA-13488 > URL: https://issues.apache.org/jira/browse/KAFKA-13488 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 2.8.1 > Reporter: Prateek Agarwal > Assignee: Prateek Agarwal > Priority: Major > Attachments: KAFKA-13488.patch > > > Producer currently fails to produce messages to a topic if the topic is > deleted and gets auto-created OR is created manually during the lifetime of > the producer (and certain other conditions are met - leaderEpochs of deleted > topic > 0). > > To reproduce, these are the steps which can be carried out: > 0) A cluster with 2 brokers 0 and 1 with auto.topic.create=true. > 1) Create a topic T with 2 partitions P0-> (0,1), P1-> (0,1) > 2) Reassign the partitions such that P0-> (1,0), P1-> (1,0). > 2) Create a producer P and send few messages which land on all the TPs of > topic T. > 3) Delete the topic T > 4) Immediately, send a new message from producer P, this message will be > failed to send and eventually timed out. > A test-case which fails with the above steps is added at the end as well as a > patch file. > > This happens after leaderEpoch (KIP-320) was introduced in the > MetadataResponse KAFKA-7738. There is a solution attempted to fix this issue > in KAFKA-12257, but the solution has a bug due to which the above use-case > still fails. > > *Issue in the solution of KAFKA-12257:* > {code:java} > // org.apache.kafka.clients.Metadata.handleMetadataResponse(): > ... > Map<String, Uuid> topicIds = new HashMap<>(); > Map<String, Uuid> oldTopicIds = cache.topicIds(); > for (MetadataResponse.TopicMetadata metadata : > metadataResponse.topicMetadata()) { > String topicName = metadata.topic(); > Uuid topicId = metadata.topicId(); > topics.add(topicName); > // We can only reason about topic ID changes when both IDs are > valid, so keep oldId null unless the new metadata contains a topic ID > Uuid oldTopicId = null; > if (!Uuid.ZERO_UUID.equals(topicId)) { > topicIds.put(topicName, topicId); > oldTopicId = oldTopicIds.get(topicName); > } else { > topicId = null; > } > ... > } {code} > With every new call to {{{}handleMetadataResponse{}}}(), {{cache.topicIds()}} > gets created afresh. When a topic is deleted and created immediately soon > afterwards (because of auto.create being true), producer's call to > {{MetadataRequest}} for the deleted topic T will result in a > {{UNKNOWN_TOPIC_OR_PARTITION}} or {{LEADER_NOT_AVAILABLE}} error > {{MetadataResponse}} depending on which point of topic recreation metadata is > being asked at. In the case of errors, TopicId returned back in the response > is {{{}Uuid.ZERO_UUID{}}}. As seen in the above logic, if the topicId > received is ZERO, the method removes the earlier topicId entry from the cache. > Now, when a non-Error Metadata Response does come back for the newly created > topic T, it will have a non-ZERO topicId now but the leaderEpoch for the > partitions will mostly be ZERO. This situation will lead to rejection of the > new MetadataResponse if the older LeaderEpoch was >0 (for more details, refer > to KAFKA-12257). Because of the rejection of the metadata, producer will > never get to know the new Leader of the TPs of the newly created topic. > > {{*}} 1. Solution / Fix (Preferred){*}: > Client's metadata should keep on remembering the old topicId till: > 1) response for the TP has ERRORs > 2) topicId entry was already present in the cache earlier > 3) retain time is not expired > {code:java} > --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java > +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java > @@ -336,6 +336,10 @@ public class Metadata implements Closeable { > topicIds.put(topicName, topicId); > oldTopicId = oldTopicIds.get(topicName); > } else { > + // Retain the old topicId for comparison with newer TopicId > created later. This is only needed till retainMs > + if (metadata.error() != Errors.NONE && > oldTopicIds.get(topicName) != null && retainTopic(topicName, false, nowMs)) > + topicIds.put(topicName, oldTopicIds.get(topicName)); > + else > topicId = null; > } > {code} > {{*}} 2. Alternative Solution / Fix {{*}}: > To allow updates to LeaderEpoch when originalTopicId was {{{}null{}}}. This > is less desirable as when cluster moves from no topic IDs to using topic IDs, > we will count this topic as new and update LeaderEpoch irrespective of > whether newEpoch was greater than current or not. > {code:java} > @@ -394,7 +398,7 @@ public class Metadata implements Closeable { > if (hasReliableLeaderEpoch && > partitionMetadata.leaderEpoch.isPresent()) { > int newEpoch = partitionMetadata.leaderEpoch.get(); > Integer currentEpoch = lastSeenLeaderEpochs.get(tp); > - if (topicId != null && oldTopicId != null && > !topicId.equals(oldTopicId)) { > + if (topicId != null && !topicId.equals(oldTopicId)) { > // If both topic IDs were valid and the topic ID changed, > update the metadata > log.info("Resetting the last seen epoch of partition {} to > {} since the associated topicId changed from {} to {}", > tp, newEpoch, oldTopicId, topicId); > {code} > From the above discussion, i think Solution 1 would be a better solution. > – > Testcase to repro the issue: > {code:java} > @Test > def testSendWithTopicDeletionMidWay(): Unit = { > val numRecords = 10 > // create topic with leader as 0 for the 2 partitions. > createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1))) > val reassignment = Map( > new TopicPartition(topic, 0) -> Seq(1, 0), > new TopicPartition(topic, 1) -> Seq(1, 0) > ) > // Change leader to 1 for both the partitions to increase leader Epoch > from 0 -> 1 > zkClient.createPartitionReassignment(reassignment) > TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress, > "failed to remove reassign partitions path after completion") > val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, > deliveryTimeoutMs = 20 * 1000) > (1 to numRecords).map { i => > val resp = producer.send(new ProducerRecord(topic, null, ("value" + > i).getBytes(StandardCharsets.UTF_8))).get > assertEquals(topic, resp.topic()) > } > // start topic deletion > adminZkClient.deleteTopic(topic) > // Verify that the topic is deleted when no metadata request comes in > TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers) > > // Producer would timeout and not self-recover after topic deletion. > val e = assertThrows(classOf[ExecutionException], () => producer.send(new > ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get) > assertEquals(classOf[TimeoutException], e.getCause.getClass) > } > {code} > Attaching the solution proposal and test repro as a patch file. -- This message was sent by Atlassian Jira (v8.20.1#820001)