[ https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351898#comment-17351898 ]
Justine Olshan commented on KAFKA-12257: ---------------------------------------- With the changes from [KIP-516|https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers] there some options to fix this bug. This ticket mentions using the topic ID as a key for the leader epoch map, but another option that might be a bit simpler to implement is checking for a new topic ID in the request. There is already work ongoing to update the Fetch path to use topic IDs ([https://github.com/apache/kafka/pull/9944]) and this will include storing the topic ID in the consumer's metadata cache. Upon receiving a new metadata response, we can check if the topic ID matches the ID already stored in the cache and set a flag if it has changed. Then, in `updateLatestMetadata`, we have code that checks the epoch: if (currentEpoch == null || newEpoch >= currentEpoch) { log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); If we include an or ( || ) for the changed topic ID, I believe this will achieve the behavior this ticket is looking for. The lastSeenEpoch will be reset to the epoch of the new topic and we will use the new partitionMetadata. > Consumer mishandles topics deleted and recreated with the same name > -------------------------------------------------------------------- > > Key: KAFKA-12257 > URL: https://issues.apache.org/jira/browse/KAFKA-12257 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 2.2.0 > Reporter: Ryan Leslie > Assignee: lqjacklee > Priority: Minor > Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch > > > In KAFKA-7738, caching of leader epochs (KIP-320) was added to > o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the > last seen epoch. > The current implementation can cause problems in cases where a consumer is > subscribed to a topic that has been deleted and then recreated with the same > name. This is something seen more often in consumers that subscribe to a > multitude of topics using a wildcard. > Currently, when a topic is deleted and the Fetcher receives > UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later > time while the consumer is still running a topic is created with the same > name, the leader epochs are set to 0 for the new topics partitions, and are > likely smaller than those for the previous topic. For example, if a broker > had restarted during the lifespan of the previous topic, the leader epoch > would be at least 1 or 2. In this case the metadata will be ignored since it > is incorrectly considered stale. Of course, the user will sometimes get > lucky, and if a topic was only recently created so that the epoch is still 0, > no problem will occur on recreation. The issue is also not seen when > consumers happen to have been restarted in between deletion and recreation. > The most common side effect of the new metadata being disregarded is that the > new partitions end up assigned but the Fetcher is unable to fetch data > because it does not know the leaders. When recreating a topic with the same > name it is likely that the partition leaders are not the same as for the > previous topic, and the number of partitions may even be different. Besides > not being able to retrieve data for the new topic, there is a more sinister > side effect of the Fetcher triggering a metadata update after the fetch > fails. The subsequent update will again ignore the topic's metadata if the > leader epoch is still smaller than the cached value. This metadata refresh > loop can continue indefinitely and with a sufficient number of consumers may > even put a strain on a cluster since the requests are occurring in a tight > loop. This can also be hard for clients to identify since there is nothing > logged by default that would indicate what's happening. Both the Metadata > class's logging of "_Not replacing existing epoch_", and the Fetcher's > logging of "_Leader for partition <T-P> is unknown_" are at DEBUG level. > A second possible side effect was observed where if the consumer is acting as > leader of the group and happens to not have any current data for the previous > topic, e.g. it was cleared due to a metadata error from a broker failure, > then the new topic's partitions may simply end up unassigned within the > group. This is because while the subscription list contains the recreated > topic the metadata for it was previously ignored due to the leader epochs. In > this case the user would see logs such as: > {noformat} > WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, > groupId=myGroup] The following subscribed topics are not assigned to any > members: [myTopic]{noformat} > Interestingly, I believe the Producer is less affected by this problem since > o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in > retainTopics() after each metadata expiration. ConsumerMetadata does no such > thing. > To reproduce this issue: > # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and > org.apache.kafka.clients.Metadata > # Begin a consumer for a topic (or multiple topics) > # Restart a broker that happens to be a leader for one of the topic's > partitions > # Delete the topic > # Create another topic with the same name > # Publish data for the new topic > # The consumer will not receive data for the new topic, and there will be a > high rate of metadata requests. > # The issue can be corrected by restarting the consumer or restarting > brokers until leader epochs are large enough > I believe KIP-516 (unique topic ids) will likely fix this problem, since > after those changes the leader epoch map should be keyed off of the topic id, > rather than the name. > One possible workaround with the current version of Kafka is to add code to > onPartitionsRevoked() to manually clear leader epochs before each rebalance, > e.g. > {code:java} > Map<TopicPartition, Integer> emptyLeaderEpochs = new HashMap<>(); > ConsumerMetadata metadata = (ConsumerMetadata)FieldUtils.readField(consumer, > "metadata", > true); > FieldUtils.writeField(metadata, "lastSeenLeaderEpochs", emptyLeaderEpochs, > true);{code} > This is not really recommended of course, since besides modifying private > consumer state, it defeats the purpose of epochs! It does in a sense revert > the consumer to pre-2.2 behavior before leader epochs existed. -- This message was sent by Atlassian Jira (v8.3.4#803005)