> On April 23, 2015, 9:51 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/OffsetManager.scala, line 124 > > <https://reviews.apache.org/r/32650/diff/1/?file=909897#file909897line124> > > > > A safer fix is to proactively purge as part of UpdateMetadataRequest - > > i.e., removePartitionInfo in metadata cache. > > > > Your fix is nice, but we need to make sure of the following: on a given > > offset manager (broker) the metadata cache must contain topic X before any > > consumer of topic X (and whose group is managed by that broker) commits > > offsets for topic X. > > > > The original scenario I was concerned about should be fine: > > - Suppose broker A (offset manager for G) starts up > > - It receives UpdateMetadataRequests from the controller for all topics > > in the cluster > > - It then receives LeaderAndIsrRequest for partitions of the offset > > topic which make it the offset manager. > > - We should be fine _as long as_ the update metadata requests occur > > first. So if we go with your approach we should at the very least add a > > unit test to guarantee this. > > > > There is another scenario. If topic X is a new topic (or has new > > partitions): > > - Broker A is the offset manager for consumer group G > > - Broker B leads a new partition of X > > - Controller C sends become leader to B and update metadata to A (which > > will populate its metadata cache) > > - B becomes the leader first > > - A consumer starts consuming X and commits offsets to A (before it has > > received the update metadata request) > > - Other consumers in the group may rebalance while all this is > > happening (since new partitions for the topic appeared) and may fetch > > offsets from A > > - But A could have deleted the offset by then. > > - This is improbable but not impossible. > > > > Onur mentioned another corner case: > > https://issues.apache.org/jira/browse/KAFKA-1787 > > > > Both would be solved by having topic generations and incorporating > > generation information when determining which offsets to purge. I don't > > think we have a jira open for that but I will follow-up offline with Onur. > > > > Do you see any other issues? > > > > So I think the options are: > > - Go with your approach + a unit test to ensure that the controller > > sends update metadata request first. > > - Go with the more conservative fix which is to purge on > > metadataCache.removePartitionInfo > > > > Also, we should add a unit test to verify offsets are in fact removed > > after deletion.
Never mind - for the second scenario we are fine. We check in offset manager if the topic exists before committing offsets. So your fix should be fine. Can you add a unit test? - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32650/#review81413 ----------------------------------------------------------- On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/32650/ > ----------------------------------------------------------- > > (Updated March 30, 2015, 9:47 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2000 > https://issues.apache.org/jira/browse/KAFKA-2000 > > > Repository: kafka > > > Description > ------- > > KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted. > > > Diffs > ----- > > core/src/main/scala/kafka/server/OffsetManager.scala > 395b1dbe43a5db47151e72a1b588d72f03cef963 > > Diff: https://reviews.apache.org/r/32650/diff/ > > > Testing > ------- > > > Thanks, > > Sriharsha Chintalapani > >