----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32650/#review81413 -----------------------------------------------------------
core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/32650/#comment131755> A safer fix is to proactively purge as part of UpdateMetadataRequest - i.e., removePartitionInfo in metadata cache. Your fix is nice, but we need to make sure of the following: on a given offset manager (broker) the metadata cache must contain topic X before any consumer of topic X (and whose group is managed by that broker) commits offsets for topic X. The original scenario I was concerned about should be fine: - Suppose broker A (offset manager for G) starts up - It receives UpdateMetadataRequests from the controller for all topics in the cluster - It then receives LeaderAndIsrRequest for partitions of the offset topic which make it the offset manager. - We should be fine _as long as_ the update metadata requests occur first. So if we go with your approach we should at the very least add a unit test to guarantee this. There is another scenario. If topic X is a new topic (or has new partitions): - Broker A is the offset manager for consumer group G - Broker B leads a new partition of X - Controller C sends become leader to B and update metadata to A (which will populate its metadata cache) - B becomes the leader first - A consumer starts consuming X and commits offsets to A (before it has received the update metadata request) - Other consumers in the group may rebalance while all this is happening (since new partitions for the topic appeared) and may fetch offsets from A - But A could have deleted the offset by then. - This is improbable but not impossible. Onur mentioned another corner case: https://issues.apache.org/jira/browse/KAFKA-1787 Both would be solved by having topic generations and incorporating generation information when determining which offsets to purge. I don't think we have a jira open for that but I will follow-up offline with Onur. Do you see any other issues? So I think the options are: - Go with your approach + a unit test to ensure that the controller sends update metadata request first. - Go with the more conservative fix which is to purge on metadataCache.removePartitionInfo Also, we should add a unit test to verify offsets are in fact removed after deletion. - Joel Koshy On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/32650/ > ----------------------------------------------------------- > > (Updated March 30, 2015, 9:47 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2000 > https://issues.apache.org/jira/browse/KAFKA-2000 > > > Repository: kafka > > > Description > ------- > > KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted. > > > Diffs > ----- > > core/src/main/scala/kafka/server/OffsetManager.scala > 395b1dbe43a5db47151e72a1b588d72f03cef963 > > Diff: https://reviews.apache.org/r/32650/diff/ > > > Testing > ------- > > > Thanks, > > Sriharsha Chintalapani > >