[ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351898#comment-17351898
 ] 

Justine Olshan commented on KAFKA-12257:
----------------------------------------

With the changes from 
[KIP-516|https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers]
 there some options to fix this bug. This ticket mentions using the topic ID as 
a key for the leader epoch map, but another option that might be a bit simpler 
to implement is checking for a new topic ID in the request.

There is already work ongoing to update the Fetch path to use topic IDs 
([https://github.com/apache/kafka/pull/9944]) and this will include storing the 
topic ID in the consumer's metadata cache.  Upon receiving a new metadata 
response, we can check if the topic ID matches the ID already stored in the 
cache and set a flag if it has changed. Then, in `updateLatestMetadata`, we 
have code that checks the epoch:
if (currentEpoch == null || newEpoch >= currentEpoch) {
    log.debug("Updating last seen epoch for partition {} from {} to epoch {} 
from new metadata", tp, currentEpoch, newEpoch);
    lastSeenLeaderEpochs.put(tp, newEpoch);
    return Optional.of(partitionMetadata);
If we include an or ( || ) for the changed topic ID, I believe this will 
achieve the behavior this ticket is looking for. The lastSeenEpoch will be 
reset to the epoch of the new topic and we will use the new partitionMetadata.

>  Consumer mishandles topics deleted and recreated with the same name
> --------------------------------------------------------------------
>
>                 Key: KAFKA-12257
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12257
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 2.2.0
>            Reporter: Ryan Leslie
>            Assignee: lqjacklee
>            Priority: Minor
>         Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition <T-P> is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> after those changes the leader epoch map should be keyed off of the topic id, 
> rather than the name.
> One possible workaround with the current version of Kafka is to add code to 
> onPartitionsRevoked() to manually clear leader epochs before each rebalance, 
> e.g.
> {code:java}
> Map<TopicPartition, Integer> emptyLeaderEpochs = new HashMap<>();
> ConsumerMetadata metadata = (ConsumerMetadata)FieldUtils.readField(consumer, 
> "metadata", 
> true);
> FieldUtils.writeField(metadata, "lastSeenLeaderEpochs", emptyLeaderEpochs, 
> true);{code}
> This is not really recommended of course, since besides modifying private 
> consumer state, it defeats the purpose of epochs! It does in a sense revert 
> the consumer to pre-2.2 behavior before leader epochs existed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to