Hi, Colin, After step 3a, do we need to update the cached offset in the leader to be the last offset in the data returned in the fetch response? If so, we need another offset index lookup since the leader only knows that it gives out X bytes in the fetch response, but not the last offset in those X bytes.
Thanks, Jun On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe <cmcc...@apache.org> wrote: > On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote: > > Hi, Colin, > > > > When fetching data for a partition, the leader needs to translate the > > fetch offset to a position in a log segment with an index lookup. If the > fetch > > request now also needs to cache the offset for the next fetch request, > > there will be an extra offset index lookup. > > Hmm. So the way I was thinking about it was, with an incremental fetch > request, for each partition: > > 1a. the leader consults its cache to find the offset it needs to use for > the fetch request > 2a. the leader performs a lookup to translate the offset to a file index > 3a. the leader reads the data from the file > > In contrast, with a full fetch request, for each partition: > > 1b. the leader looks at the FetchRequest to find the offset it needs to > use for the fetch request > 2b. the leader performs a lookup to translate the offset to a file index > 3b. the leader reads the data from the file > > It seems like there is only one offset index lookup in both cases? The > key point is that the cache in step #1a is not stored on disk. Or maybe > I'm missing something here. > > best, > Colin > > > > The offset index lookup can > > potentially be expensive since it could require disk I/Os. One way to > > optimize this a bit is to further cache the log segment position for the > > next offset. The tricky issue is that for a compacted topic, the > > underlying > > log segment could have changed between two consecutive fetch requests. We > > could potentially make that case work, but the logic will be more > > complicated. > > > > Another thing is that it seems that the proposal only saves the metadata > > overhead if there are low volume topics. If we use Jay's suggestion of > > including 0 partitions in subsequent fetch requests, it seems that we > > could > > get the metadata saving even if all topics have continuous traffic. > > > > Thanks, > > > > Jun > > > > > > On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe <cmcc...@apache.org> > wrote: > > > > > On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote: > > > > Hi, Jay, > > > > > > > > I guess in your proposal the leader has to cache the last offset > given > > > > back for each partition so that it knows from which offset to serve > the > > > next > > > > fetch request. > > > > > > Hi Jun, > > > > > > Just to clarify, the leader has to cache the last offset for each > > > follower / UUID in the original KIP-227 proposal as well. Sorry if > that > > > wasn't clear. > > > > > > > This is doable but it means that the leader needs to do an > > > > additional index lookup per partition to serve a fetch request. Not > sure > > > > if the benefit from the lighter fetch request obviously offsets the > > > > additional index lookup though. > > > > > > The runtime impact should be a small constant factor at most, right? > > > You would just have a mapping between UUID and the latest offset in > each > > > partition data structure. It seems like the runtime impact of looking > > > up the fetch offset in a hash table (or small array) in the in-memory > > > partition data structure should be very similar to the runtime impact > of > > > looking up the fetch offset in the FetchRequest. > > > > > > The extra memory consumption per partition is O(num_brokers), which is > > > essentially a small constant. (The fact that brokers can have multiple > > > UUIDs due to parallel fetches is a small wrinkle. But we can place an > > > upper bound on the number of UUIDs permitted per broker.) > > > > > > best, > > > Colin > > > > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Tue, Nov 21, 2017 at 7:03 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > > > I think the general thrust of this makes a ton of sense. > > > > > > > > > > I don't love that we're introducing a second type of fetch > request. I > > > think > > > > > the motivation is for compatibility, right? But isn't that what > > > versioning > > > > > is for? Basically to me although the modification we're making > makes > > > sense, > > > > > the resulting protocol doesn't really seem like something you would > > > design > > > > > this way from scratch. > > > > > > > > > > I think I may be misunderstanding the semantics of the partitions > in > > > > > IncrementalFetchRequest. I think the intention is that the server > > > remembers > > > > > the partitions you last requested, and the partitions you specify > in > > > the > > > > > request are added to this set. This is a bit odd though because you > > > can add > > > > > partitions but I don't see how you remove them, so it doesn't > really > > > let > > > > > you fully make changes incrementally. I suspect I'm > misunderstanding > > > that > > > > > somehow, though. You'd also need to be a little bit careful that > there > > > was > > > > > no way for the server's idea of what the client is interested in > and > > > the > > > > > client's idea to ever diverge as you made these modifications over > time > > > > > (due to bugs or whatever). > > > > > > > > > > It seems like an alternative would be to not add a second request, > but > > > > > instead change the fetch api and implementation > > > > > > > > > > 1. We save the partitions you last fetched on that connection > in the > > > > > session for the connection (as I think you are proposing) > > > > > 2. It only gives you back info on partitions that have data or > have > > > > > changed (no reason you need the others, right?) > > > > > 3. Not specifying any partitions means "give me the usual", as > > > defined > > > > > by whatever you requested before attached to the session. > > > > > > > > > > This would be a new version of the fetch API, so compatibility > would be > > > > > retained by retaining the older version as is. > > > > > > > > > > This seems conceptually simpler to me. It's true that you have to > > > resend > > > > > the full set whenever you want to change it, but that actually > seems > > > less > > > > > error prone and that should be rare. > > > > > > > > > > I suspect you guys thought about this and it doesn't quite work, > but > > > maybe > > > > > you could explain why? > > > > > > > > > > -Jay > > > > > > > > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe <cmcc...@apache.org> > > > wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > I created a KIP to improve the scalability and latency of > > > FetchRequest: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > > > > > > Partition+Scalability > > > > > > > > > > > > Please take a look. > > > > > > > > > > > > cheers, > > > > > > Colin > > > > > > > > > > > > > > >