[ 
https://issues.apache.org/jira/browse/KAFKA-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450618#comment-17450618
 ] 

Justine Olshan commented on KAFKA-13488:
----------------------------------------

To clarify, did KAFKA-12257 make this issue worse, or just didn't fix it all 
the way? 

> Producer fails to recover if topic gets deleted (and gets auto-created)
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-13488
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13488
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 2.8.1
>            Reporter: Prateek Agarwal
>            Assignee: Prateek Agarwal
>            Priority: Major
>         Attachments: KAFKA-13488.patch
>
>
> Producer currently fails to produce messages to a topic if the topic is 
> deleted and gets auto-created OR is created manually during the lifetime of 
> the producer (and certain other conditions are met - leaderEpochs of deleted 
> topic > 0).
>  
> To reproduce, these are the steps which can be carried out:
> 0) A cluster with 2 brokers 0 and 1 with auto.topic.create=true.
> 1) Create a topic T with 2 partitions P0-> (0,1), P1-> (0,1)
> 2) Reassign the partitions such that P0-> (1,0), P1-> (1,0).
> 2) Create a producer P and send few messages which land on all the TPs of 
> topic T.
> 3) Delete the topic T
> 4) Immediately, send a new message from producer P, this message will be 
> failed to send and eventually timed out.
> A test-case which fails with the above steps is added at the end as well as a 
> patch file.
>  
> This happens after leaderEpoch (KIP-320) was introduced in the 
> MetadataResponse KAFKA-7738. There is a solution attempted to fix this issue 
> in KAFKA-12257, but the solution has a bug due to which the above use-case 
> still fails.
>  
> *Issue in the solution of KAFKA-12257:*
> {code:java}
> // org.apache.kafka.clients.Metadata.handleMetadataResponse():
>        ...
>         Map<String, Uuid> topicIds = new HashMap<>();
>         Map<String, Uuid> oldTopicIds = cache.topicIds();
>         for (MetadataResponse.TopicMetadata metadata : 
> metadataResponse.topicMetadata()) {
>             String topicName = metadata.topic();
>             Uuid topicId = metadata.topicId();
>             topics.add(topicName);
>             // We can only reason about topic ID changes when both IDs are 
> valid, so keep oldId null unless the new metadata contains a topic ID
>             Uuid oldTopicId = null;
>             if (!Uuid.ZERO_UUID.equals(topicId)) {
>                 topicIds.put(topicName, topicId);
>                 oldTopicId = oldTopicIds.get(topicName);
>             } else {
>                  topicId = null;
>             }
>     ...
> } {code}
> With every new call to {{{}handleMetadataResponse{}}}(), {{cache.topicIds()}} 
> gets created afresh. When a topic is deleted and created immediately soon 
> afterwards (because of auto.create being true), producer's call to 
> {{MetadataRequest}} for the deleted topic T will result in a 
> {{UNKNOWN_TOPIC_OR_PARTITION}} or {{LEADER_NOT_AVAILABLE}} error 
> {{MetadataResponse}} depending on which point of topic recreation metadata is 
> being asked at. In the case of errors, TopicId returned back in the response 
> is {{{}Uuid.ZERO_UUID{}}}. As seen in the above logic, if the topicId 
> received is ZERO, the method removes the earlier topicId entry from the cache.
> Now, when a non-Error Metadata Response does come back for the newly created 
> topic T, it will have a non-ZERO topicId now but the leaderEpoch for the 
> partitions will mostly be ZERO. This situation will lead to rejection of the 
> new MetadataResponse if the older LeaderEpoch was >0 (for more details, refer 
> to KAFKA-12257). Because of the rejection of the metadata, producer will 
> never get to know the new Leader of the TPs of the newly created topic.
>  
> {{*}} 1. Solution / Fix (Preferred){*}:
> Client's metadata should keep on remembering the old topicId till:
> 1) response for the TP has ERRORs
> 2) topicId entry was already present in the cache earlier
> 3) retain time is not expired
> {code:java}
> --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
> +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
> @@ -336,6 +336,10 @@ public class Metadata implements Closeable {
>                  topicIds.put(topicName, topicId);
>                  oldTopicId = oldTopicIds.get(topicName);
>              } else {
> +                // Retain the old topicId for comparison with newer TopicId 
> created later. This is only needed till retainMs
> +                if (metadata.error() != Errors.NONE && 
> oldTopicIds.get(topicName) != null && retainTopic(topicName, false, nowMs))
> +                    topicIds.put(topicName, oldTopicIds.get(topicName));
> +                else
>                      topicId = null;
>              }
> {code}
> {{*}} 2. Alternative Solution / Fix {{*}}:
> To allow updates to LeaderEpoch when originalTopicId was {{{}null{}}}. This 
> is less desirable as when cluster moves from no topic IDs to using topic IDs, 
> we will count this topic as new and update LeaderEpoch irrespective of 
> whether newEpoch was greater than current or not.
> {code:java}
> @@ -394,7 +398,7 @@ public class Metadata implements Closeable {
>          if (hasReliableLeaderEpoch && 
> partitionMetadata.leaderEpoch.isPresent()) {
>              int newEpoch = partitionMetadata.leaderEpoch.get();
>              Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
> -            if (topicId != null && oldTopicId != null && 
> !topicId.equals(oldTopicId)) {
> +            if (topicId != null && !topicId.equals(oldTopicId)) {
>                  // If both topic IDs were valid and the topic ID changed, 
> update the metadata
>                  log.info("Resetting the last seen epoch of partition {} to 
> {} since the associated topicId changed from {} to {}",
>                           tp, newEpoch, oldTopicId, topicId);
> {code}
> From the above discussion, i think Solution 1 would be a better solution.
> –
> Testcase to repro the issue:
> {code:java}
>   @Test
>   def testSendWithTopicDeletionMidWay(): Unit = {
>     val numRecords = 10
>     // create topic with leader as 0 for the 2 partitions.
>     createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
>     val reassignment = Map(
>       new TopicPartition(topic, 0) -> Seq(1, 0),
>       new TopicPartition(topic, 1) -> Seq(1, 0)
>     )
>     // Change leader to 1 for both the partitions to increase leader Epoch 
> from 0 -> 1
>     zkClient.createPartitionReassignment(reassignment)
>     TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
>       "failed to remove reassign partitions path after completion")
>     val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, 
> deliveryTimeoutMs = 20 * 1000)
>     (1 to numRecords).map { i =>
>       val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
> i).getBytes(StandardCharsets.UTF_8))).get
>       assertEquals(topic, resp.topic())
>     }
>     // start topic deletion
>     adminZkClient.deleteTopic(topic)
>     // Verify that the topic is deleted when no metadata request comes in
>     TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
>     
>     // Producer would timeout and not self-recover after topic deletion.
>     val e = assertThrows(classOf[ExecutionException], () => producer.send(new 
> ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get)
>     assertEquals(classOf[TimeoutException], e.getCause.getClass)
>   }
> {code}
> Attaching the solution proposal and test repro as a patch file.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to