Hi James, There are 2 options being discussed.
Option A is similar to the existing approach where the follower informs the leader of offsets it has seen by asking for the next ones. We just skip the partitions where the offset hasn't changed. Option B involves the leader keeping track of the offsets returned to the follower. So, when the follower does the next incremental request (with no partitions), the leader assumes that the previously returned offsets were stored by the follower. An important invariant is that the follower can only send an empty incremental fetch request if the previous response was successfully processed. What does the follower do if there was an issue processing _some_ of the partitions in the response? The simplest option would be to send a full fetch request. An alternative would be for the follower to send an incremental fetch request with some offsets (overrides to what the leader expects) although that adds even more complexity (i.e. it's a combination of options A and B) and may not be worth it. Ismael On Thu, Nov 23, 2017 at 4:58 AM, James Cheng <wushuja...@gmail.com> wrote: > I think the discussion may have already cover this but just in case... > > How does the leader decide when a newly written message is "committed" > enough to hand out to consumers? > > When a message is produced and is stored to the disk of the leader, the > message is not considered "committed" until it has hit all replicas in the > ISR. Only at that point will the leader decide to hand out the message to > normal consumers. > > In the current protocol, I believe the leader has to wait for 2 fetch > requests from a follower before it considers the message committed: One to > fetch the uncommitted message, and another to fetch anything after that. It > is the fetch offset in the 2nd fetch that tells the leader that the > follower now has the uncommitted message. > > As an example: > 1a. Newly produced messages at offsets 10,11,12. Saved to leader, not yet > replicated to followers. > 2a. Follower asks for messages starting at offset 10. Leader hands out > messages 10,11,12 > 3a. Follower asks for messages starting at offset 13. Based on that fetch > request, the leader concludes that the follower already has messages > 10,11,12, and so will now hand messages 10,11,12 out to consumers. > > How will the new protocol handle that? How will the leader know that the > follower already has messages 10,11,12? > > In particular, how will the new protocol handle the case when not all > partitions are returned in each request? > > Another example: > 1b. Newly produced messages to topic A at offsets 10,11,12. Saved to > leader, not yet replicated to followers. > 2b. Newly produced 1MB message to topic B at offset 100. Saved to leader, > not yet replicated to follower. > 3b. Follower asks for messages from topic A starting at offset 10, and > messages from topic B starting at offset 100. > 4b. Leader decides to send to the follower the 1MB message at topic B > offset 100. Due to replica.fetch.max.bytes, it only sends that single > message to the follower. > 5b. Follower asks for messages from topic A starting at offset 10, and > messages from topic B starting at offset 101. Leader concludes that topic B > offset 100 has been replicated and so can be handed out to consumers. Topic > A messages 10,11,12 are not yet replicated and so cannot yet be handled out > to consumers. > > In this particular case, the follower made no progress on replicating the > new messages from topic A. > > How will the new protocol handle this scenario? > > -James > > > On Nov 22, 2017, at 7:54 PM, Colin McCabe <cmcc...@apache.org> wrote: > > > > Oh, I see the issue now. The broker uses sendfile() and sends some > > message data without knowing what the ending offset is. To learn that, > > we would need another index access. > > However, when we do that index->offset lookup, we know that the next > offset- > >> index lookup (done in the following fetch request) will be for the same > > offset. So we should be able to cache the result (the index). Also: > > Does the operating system’s page cache help us here? > > Best, > > Colin > > > > On Wed, Nov 22, 2017, at 16:53, Jun Rao wrote: > >> Hi, Colin, > >> > >> After step 3a, do we need to update the cached offset in the > >> leader to be> the last offset in the data returned in the fetch > response? If so, we> need > >> another offset index lookup since the leader only knows that it > >> gives out> X > >> bytes in the fetch response, but not the last offset in those X bytes.> > >> Thanks, > >> > >> Jun > >> > >> On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe > >> <cmcc...@apache.org> wrote:> > >>> On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote: > >>>> Hi, Colin, > >>>> > >>>> When fetching data for a partition, the leader needs to > >>>> translate the> > > fetch offset to a position in a log segment with > an index lookup. > >>>> If the> > fetch > >>>> request now also needs to cache the offset for the next fetch > >>>> request,> > > there will be an extra offset index lookup. > >>> > >>> Hmm. So the way I was thinking about it was, with an > >>> incremental fetch> > request, for each partition: > >>> > >>> 1a. the leader consults its cache to find the offset it needs to > >>> use for> > the fetch request > >>> 2a. the leader performs a lookup to translate the offset to a > >>> file index> > 3a. the leader reads the data from the file > >>> > >>> In contrast, with a full fetch request, for each partition: > >>> > >>> 1b. the leader looks at the FetchRequest to find the offset it > >>> needs to> > use for the fetch request > >>> 2b. the leader performs a lookup to translate the offset to a > >>> file index> > 3b. the leader reads the data from the file > >>> > >>> It seems like there is only one offset index lookup in both > >>> cases? The> > key point is that the cache in step #1a is not stored > on disk. > >>> Or maybe> > I'm missing something here. > >>> > >>> best, > >>> Colin > >>> > >>> > >>>> The offset index lookup can > >>>> potentially be expensive since it could require disk I/Os. One > >>>> way to> > > optimize this a bit is to further cache the log segment > position > >>>> for the> > > next offset. The tricky issue is that for a compacted > topic, the > >>>> underlying > >>>> log segment could have changed between two consecutive fetch > >>>> requests. We> > > could potentially make that case work, but the > logic will be more> > > complicated. > >>>> > >>>> Another thing is that it seems that the proposal only saves the > >>>> metadata> > > overhead if there are low volume topics. If we use Jay's > >>>> suggestion of> > > including 0 partitions in subsequent fetch > requests, it seems > >>>> that we> > > could > >>>> get the metadata saving even if all topics have continuous > >>>> traffic.> > > > >>>> Thanks, > >>>> > >>>> Jun > >>>> > >>>> > >>>> On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe <cmcc...@apache.org>> > > wrote: > >>>> > >>>>> On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote: > >>>>>> Hi, Jay, > >>>>>> > >>>>>> I guess in your proposal the leader has to cache the last > >>>>>> offset> > given > >>>>>> back for each partition so that it knows from which offset to > >>>>>> serve> > the > >>>>> next > >>>>>> fetch request. > >>>>> > >>>>> Hi Jun, > >>>>> > >>>>> Just to clarify, the leader has to cache the last offset for > >>>>> each> > > > follower / UUID in the original KIP-227 proposal as > well. Sorry > >>>>> if> > that > >>>>> wasn't clear. > >>>>> > >>>>>> This is doable but it means that the leader needs to do an > >>>>>> additional index lookup per partition to serve a fetch > >>>>>> request. Not> > sure > >>>>>> if the benefit from the lighter fetch request obviously > >>>>>> offsets the> > > > > additional index lookup though. > >>>>> > >>>>> The runtime impact should be a small constant factor at most, > >>>>> right?> > > > You would just have a mapping between UUID and the > latest offset > >>>>> in> > each > >>>>> partition data structure. It seems like the runtime impact of > >>>>> looking> > > > up the fetch offset in a hash table (or small array) > in the in- > >>>>> memory> > > > partition data structure should be very similar to the > runtime > >>>>> impact> > of > >>>>> looking up the fetch offset in the FetchRequest. > >>>>> > >>>>> The extra memory consumption per partition is O(num_brokers), > >>>>> which is> > > > essentially a small constant. (The fact that > brokers can have > >>>>> multiple> > > > UUIDs due to parallel fetches is a small wrinkle. > But we can > >>>>> place an> > > > upper bound on the number of UUIDs permitted per > broker.) > >>>>> > >>>>> best, > >>>>> Colin > >>>>> > >>>>>> > >>>>>> Thanks, > >>>>>> > >>>>>> Jun > >>>>>> > >>>>>> On Tue, Nov 21, 2017 at 7:03 PM, Jay Kreps <j...@confluent.io> > >>>>>> wrote:> > > > > > >>>>>>> I think the general thrust of this makes a ton of sense. > >>>>>>> > >>>>>>> I don't love that we're introducing a second type of fetch > >>> request. I > >>>>> think > >>>>>>> the motivation is for compatibility, right? But isn't that > >>>>>>> what> > > > versioning > >>>>>>> is for? Basically to me although the modification we're > >>>>>>> making> > makes > >>>>> sense, > >>>>>>> the resulting protocol doesn't really seem like something > >>>>>>> you would> > > > design > >>>>>>> this way from scratch. > >>>>>>> > >>>>>>> I think I may be misunderstanding the semantics of the > >>>>>>> partitions> > in > >>>>>>> IncrementalFetchRequest. I think the intention is that the > >>>>>>> server> > > > remembers > >>>>>>> the partitions you last requested, and the partitions you > >>>>>>> specify> > in > >>>>> the > >>>>>>> request are added to this set. This is a bit odd though > >>>>>>> because you> > > > can add > >>>>>>> partitions but I don't see how you remove them, so it > >>>>>>> doesn't> > really > >>>>> let > >>>>>>> you fully make changes incrementally. I suspect I'm > >>> misunderstanding > >>>>> that > >>>>>>> somehow, though. You'd also need to be a little bit careful > >>>>>>> that> > there > >>>>> was > >>>>>>> no way for the server's idea of what the client is > >>>>>>> interested in> > and > >>>>> the > >>>>>>> client's idea to ever diverge as you made these > >>>>>>> modifications over> > time > >>>>>>> (due to bugs or whatever). > >>>>>>> > >>>>>>> It seems like an alternative would be to not add a second > >>>>>>> request,> > but > >>>>>>> instead change the fetch api and implementation > >>>>>>> > >>>>>>> 1. We save the partitions you last fetched on that > >>>>>>> connection> > in the > >>>>>>> session for the connection (as I think you are proposing)> > > > > > > 2. It only gives you back info on partitions that have > >>>>>>> data or> > have > >>>>>>> changed (no reason you need the others, right?) > >>>>>>> 3. Not specifying any partitions means "give me the > >>>>>>> usual", as> > > > defined > >>>>>>> by whatever you requested before attached to the session.> > > > > > > > >>>>>>> This would be a new version of the fetch API, so > >>>>>>> compatibility> > would be > >>>>>>> retained by retaining the older version as is. > >>>>>>> > >>>>>>> This seems conceptually simpler to me. It's true that you > >>>>>>> have to> > > > resend > >>>>>>> the full set whenever you want to change it, but that > >>>>>>> actually> > seems > >>>>> less > >>>>>>> error prone and that should be rare. > >>>>>>> > >>>>>>> I suspect you guys thought about this and it doesn't quite > >>>>>>> work,> > but > >>>>> maybe > >>>>>>> you could explain why? > >>>>>>> > >>>>>>> -Jay > >>>>>>> > >>>>>>> On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe > >>>>>>> <cmcc...@apache.org>> > > > wrote: > >>>>>>> > >>>>>>>> Hi all, > >>>>>>>> > >>>>>>>> I created a KIP to improve the scalability and latency of> > > > > FetchRequest: > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > >>>>>>>> Partition+Scalability > >>>>>>>> > >>>>>>>> Please take a look. > >>>>>>>> > >>>>>>>> cheers, > >>>>>>>> Colin > >>>>>>>> > >>>>>>> > >>>>> > >>> > > > >