On Fri, Nov 24, 2017, at 22:06, Dong Lin wrote: > Hey Colin, > > Thanks for the reply! Please see my comment inline. > > On Fri, Nov 24, 2017 at 9:39 PM, Colin McCabe <cmcc...@apache.org> wrote: > > > On Thu, Nov 23, 2017, at 18:35, Dong Lin wrote: > > > Hey Colin, > > > > > > Thanks for the KIP! This is definitely useful when there are many idle > > > partitions in the clusters. > > > > > > Just in case it is useful, I will provide some number here. We observe > > > that for a clsuter that have around 2.5k partitions per broker, the > > > ProduceRequestTotal time average value is around 25 ms. For a cluster > > > with 2.5k partitions per broker whose AllTopicsBytesInRate is only > > around 6 > > > MB/s, the ProduceRequestTotalTime average value is around 180 ms, most of > > > which is spent on ProduceRequestRemoteTime. The increased > > > ProduceRequestTotalTime significantly reduces throughput of producers > > > with ack=all. I think this KIP can help address this problem. > > > > Hi Dong, > > > > Thanks for the numbers. It's good to have empirical confirmation that > > this will help! > > > > > > > > Here are some of my ideas on the current KIP: > > > > > > - The KIP says that the follower will include a partition in > > > the IncrementalFetchRequest if the LEO of the partition has been updated. > > > It seems that doing so may prevent leader from knowing information (e.g. > > > LogStartOffset) of the follower that will otherwise be included in the > > > FetchRequest. Maybe we should have a paragraph to explicitly define the > > > full criteria of when the fetcher should include a partition in the > > > FetchResponse and probably include logStartOffset as part of the > > > criteria? > > > > Hmm. That's a good point... we should think about whether we need to > > send partition information in an incremental update when the LSO > > changes. > > > > Sorry if this is a dumb question, but what does the leader do with the > > logStartOffset of the followers? When does the leader need to know it? > > Also, how often do we expect it to be changed by the LogCleaner? > > >
Hi Dong, > The leader uses logStartOffset of the followers to determine the > logStartOffset of the partition. It is needed to handle > DeleteRecordsRequest. It can be changed if the log is deleted on the > follower due to log retention. Is there really a big advantage to the leader caching the LSO for each follower? I guess it allows you to avoid sending the DeleteRecordsRequest to followers that you know have already deleted the records in question. But the leader can just broadcast the request to all the followers. This uses less network bandwidth than sending a single batch of records with acks=all. > > > > > > > - It seems that every time the set of partitions in the > > > ReplicaFetcherThread is changed, or if follower restarts, a new UUID will > > > be generated in the leader and leader will add a new entry in the > > > in-memory map to map the UUID to list of partitions (and other metadata > > such as > > > fetch offset). This map with grow over time depending depending on the > > > frequency of events such as partition movement or broker restart. As you > > mentioned, > > > we probably need to timeout entries in this map. But there is also > > > tradeoff in this timeout -- large timeout increase memory usage whereas > > smaller > > > timeout increases frequency of the full FetchRequest. Could you specify > > > the default value of this timeout and probably also explain how it > > affects > > > the performance of this KIP? > > > > Right, there are definitely some tradeoffs here. > > > > Since fetches happen very frequently, I think even a short UUID cache > > expiration time of a minute or two should already be enough to ensure > > that 99%+ of all fetch requests are incremental fetch requests. I think > > the idea of partitioning the cache per broker is a good one which will > > let us limit memory consumption even more. > > > > If replica fetcher threads do change their partition assignments often, > > we could also add a special "old UUID to uncache" field to the > > FetchRequest as well. That would avoid having to wait for the full > > minute to clear the UUID cache. That's probably not necessary, > > though... > > > > I think expiration time of a minute is two is probably reasonable. Yeah > we > can discuss it further after the KIP is updated. Thanks! > > > > > > > Also, do you think we can avoid having duplicate > > > entries from the same ReplicaFetcher (in case of partition set change) by > > > using brokerId+fetcherThreadIndex as the UUID? > > > > My concern about that is that if two messages get reordered somehow, or > > an update gets lost, the view of partitions which the fetcher thread has > > could diverge from the view which the leader has. Also, UUIDs work for > > consumers, but clearly consumers cannot use a > > brokerID+fetcherThreadIndex. It's simpler to have one system than two. > > > > Yeah this can be a problem if two messages are lost of reordered somehow. > I > am just wondering whether there actually exists a scenario where the > message can be ordered between ReplicaFetcherThread and the leader. My > gut > feel is that since the ReplicaFetcherThread talks to leader using a > single > TCP connection with inflight requests = 1, out-of-order delivery probably > should not happen. I may be wrong though. What do you think? It's not necessarily a single TCP connection, though... we re-establish the connection when required. I also suspect that we don't always process requests strictly in the order they came in, due to using things like multiple worker threads that operate in parallel. > > > > > > > > > > I agree with the previous comments that 1) ideally we want to evolve the > > > existing existing FetchRequest instead of adding a new request type; and > > > 2) KIP hopefully can also apply to replication service such as e.g. > > > MirrorMaker. In addition, ideally we probably want to implement the new > > > logic in a separate class without having to modify the existing class > > > (e.g. Log, LogManager) so that the implementation and design can be > > simpler > > > going forward. Motivated by these concepts, I am wondering if the > > following > > > alternative design may be worth thinking. > > > > > > Here are the details of a potentially feasible alternative approach. > > > > > > *Protocol change: * > > > > > > - We add a fetcherId of string type in the FetchRequest. This fetcherId > > > is similarly to UUID and helps leader correlate the fetcher (i.e. > > > ReplicaFetcherThread or MM consumer) with the state of the fetcher. This > > > fetcherId is determined by the fetcher. For most consumers this fetcherId > > > is null. For ReplicaFetcherThread this fetcherId = brokerId + > > > threadIndex. > > > For MM this is groupId+someIndex. > > > > As Jay pointed out earlier, there are other consumers besides > > MirrorMaker that might want to take advantage of incremental fetch > > requests. He gave the example of the HDFS connector, but there are many > > others that might want to follow a lot of partitions. So I don't think > > we should special-case MirrorMaker. > > > > Yeah there are indeed many other uses-cases of replication. MM is just > one > example. > > > > > > Also, I do not think that the consumer should choose the UUID. If the > > consumer chooses the UUID, then multiple consumers may choose the same > > one, either maliciously or by accident. We don't need to trust the > > client to choose a unique UUID, when the broker can simply choose one > > that it knows is unique. This eliminates a class of bugs which we might > > otherwise encounter. > > > > The groupId is used to determine the partition assignment and clientId is > used to determine quota. > > It seems that trusting the UUID from consumer has the same problem with > trusting the groupId and clientId from consumer. For example, if consumer > accidentally or maliciously used the same groupId/clientId as another > consumer, it can already cause problem for either the partition > assignment > of the consumer group or the quota of the clientId. Both problems are > expected to be addressed with authentication. It seems OK to treat the > UUID > with the same trust level as the groupId and we can address this problem > with authentication as well. Does this sound reasonable? I agree that SASL should prevent clients from spoofing each other, when it is in use. However, defense in depth is still a useful security principle. There are also cases where clients could accidentally choose duplicate UUIDs, rather than maliciously: software bugs, misconfigurations, and so forth. These things can have serious consequences when we're dealing with replication. In any case, I don't think there's any advantage to letting the client choose the ID. > > > > > > > > > > *Proposed change in leader broker:* > > > > > > - A new class FetcherHandler will be used in the leader to map the > > > fetcherId to state of the fetcher. The state of the fetcher is a list of > > > FETCH_REQUEST_PARTITION_V0 for selected partitions. > > > > > > - After leader receives a FetchRequest, it first transforms the > > > FetchRequest by doing request = FetcherHandler.addPartition(request) > > > before > > > giving this partition to KafkaApis.handle(request). If the fetcherId in > > > this request is null, this method does not make any change. Otherwise, it > > > takes the list of FETCH_REQUEST_PARTITION_V0 associated with this > > > fetcherId > > > and append it to the given request. The state of a new non-null fetcherId > > > is an empty list. > > > > > > - The KafkaApis.handle(request) will process the request and generate a > > > response. All existing logic in ReplicaManager, LogManager and so on does > > > not need to be changed. > > > > > > - The leader calls response = FetcherHandler.removePartition(response) > > > before sending the response back to the fetcher. > > > FetcherHandler.removePartition(response) > > > enumerates all partitions in the response. If a partition is "empty" > > > (e.g. > > > no records to be sent), this partition and its FETCH_REQUEST_PARTITION_V0 > > > in the original FetchRequest is added to the state of this fetcherId; > > > and > > > this partition is removed from the response. If the partition is not > > > "empty", the partition is remove from the state of this fetcherId. > > > > > > *Proposed change in the ReplicaFetcherThread:* > > > > > > - In addition the set of assigned partitions, the ReplicaFetcherThreads > > > also keeps track of the subset of assigned partitions which are non-empty > > > in the last FetchResponse. The is initialized to be the set of assigned > > > partitions. Then it is updated every time a FetchResponse is received. > > > The > > > FetchResponse constructed by ReplicaFetcherThread includes exactly this > > > subset of assigned partition. > > > > > > Here is how it works. Say there are 100 partitions (from 0 to 99) and > > > initially partition 0 has new data. > > > > > > - ReplicaFetcherThread will initially send FetchRequest for all 100 > > > partitions. > > > - KafkaApis will return FetchResponse containing all 100 partitions. > > > Partition 0 has data but the other 99 partitions are empty. > > > - FetcherHandler will map this fetcherId to a list of 99 partitions > > > together with related fields in FETCH_REQUEST_PARTITION_V0, e.g. fetch > > > offset. FetcherHandler will then remove the 99 empty partitions from the > > > response so that response only contains partition 0. > > > - ReplicaFetcherThread receives a response containing only partition 0. > > > The > > > next FetchRequest will contain only partition 0. > > > > > > The design seems to work and can also handle the case where partition > > > switches between active and inactive state. Do you think this would > > > address > > > the concern in the previous email (e.g. evolve existing protocol) > > > properly? > > > > Thanks for the sketch-- it's very interesting. > > > > Hmm. A lot of this sounds like implementation details which are > > probably better to discuss in the JIRA. Also, it's not clear to me why > > avoiding changes to existing classes (such as Log) is desirable-- is > > there a specific concern you have here? > > > Yeah this is indeed implementation detail. I am providing this mostly to > show that it may be possible to evolve the existing FetchRequest without > having two code paths for it, which can probably the concern that you > mentioned earlier with evolving the FetchRequest. We can probabaly > discuss > further after the KIP is updated to combine the two request. Sounds good. best, Colin > > Thanks, > Dong > > > > > > > > Thanks for the feedback about re-using the existing FetchRequest. When > > I update the KIP, I will combine the two request, like you and Jay > > suggested. I think it will avoid some duplication. > > > > cheers, > > Colin > > > > > > > > Thanks! > > > Dong > > > > > > > > > On Thu, Nov 23, 2017 at 2:12 PM, Becket Qin <becket....@gmail.com> > > wrote: > > > > > > > Hi Ismael, > > > > > > > > Yes, you are right. The metadata may not help for multiple fetch > > thread or > > > > the consumer case. Session based approach is probably better in this > > case. > > > > > > > > The optimization of only returning data at the offset index entry > > boundary > > > > may still be worth considering. It also helps improve the index lookup > > in > > > > general. > > > > > > > > @Jun, > > > > Good point of log compacted topics. Perhaps we can make sure the read > > will > > > > always be operated on the original segment file even if a compacted log > > > > segment is swapped in. Combining this with the above solution which > > always > > > > returns the data at the index boundary when possible, it seems we can > > avoid > > > > the additional look up safely. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Thu, Nov 23, 2017 at 9:31 AM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Yes, caching the log segment position after the index lookup may > > work. > > > > One > > > > > subtle issue is that for a compacted topic, the underlying log > > segment > > > > may > > > > > have changed between two consecutive fetch requests, and we need to > > think > > > > > through the impact of that. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Wed, Nov 22, 2017 at 7:54 PM, Colin McCabe <cmcc...@apache.org> > > > > wrote: > > > > > > > > > > > Oh, I see the issue now. The broker uses sendfile() and sends some > > > > > > message data without knowing what the ending offset is. To learn > > that, > > > > > we > > > > > > would need another index access. > > > > > > > > > > > > However, when we do that index->offset lookup, we know that the > > next > > > > > > offset->index lookup (done in the following fetch request) will be > > for > > > > > the > > > > > > same offset. So we should be able to cache the result (the index). > > > > > Also: > > > > > > Does the operating system’s page cache help us here? > > > > > > > > > > > > Best, > > > > > > Colin > > > > > > > > > > > > On Wed, Nov 22, 2017, at 16:53, Jun Rao wrote: > > > > > > > Hi, Colin, > > > > > > > > > > > > > > After step 3a, do we need to update the cached offset in the > > leader > > > > to > > > > > be > > > > > > > the last offset in the data returned in the fetch response? If > > so, we > > > > > > > need > > > > > > > another offset index lookup since the leader only knows that it > > gives > > > > > out > > > > > > > X > > > > > > > bytes in the fetch response, but not the last offset in those X > > > > bytes. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe < > > cmcc...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote: > > > > > > > > > Hi, Colin, > > > > > > > > > > > > > > > > > > When fetching data for a partition, the leader needs to > > translate > > > > > the > > > > > > > > > fetch offset to a position in a log segment with an index > > lookup. > > > > > If > > > > > > the > > > > > > > > fetch > > > > > > > > > request now also needs to cache the offset for the next fetch > > > > > > request, > > > > > > > > > there will be an extra offset index lookup. > > > > > > > > > > > > > > > > Hmm. So the way I was thinking about it was, with an > > incremental > > > > > fetch > > > > > > > > request, for each partition: > > > > > > > > > > > > > > > > 1a. the leader consults its cache to find the offset it needs > > to > > > > use > > > > > > for > > > > > > > > the fetch request > > > > > > > > 2a. the leader performs a lookup to translate the offset to a > > file > > > > > > index > > > > > > > > 3a. the leader reads the data from the file > > > > > > > > > > > > > > > > In contrast, with a full fetch request, for each partition: > > > > > > > > > > > > > > > > 1b. the leader looks at the FetchRequest to find the offset it > > > > needs > > > > > to > > > > > > > > use for the fetch request > > > > > > > > 2b. the leader performs a lookup to translate the offset to a > > file > > > > > > index > > > > > > > > 3b. the leader reads the data from the file > > > > > > > > > > > > > > > > It seems like there is only one offset index lookup in both > > cases? > > > > > The > > > > > > > > key point is that the cache in step #1a is not stored on > > disk. Or > > > > > > maybe > > > > > > > > I'm missing something here. > > > > > > > > > > > > > > > > best, > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > The offset index lookup can > > > > > > > > > potentially be expensive since it could require disk I/Os. > > One > > > > way > > > > > to > > > > > > > > > optimize this a bit is to further cache the log segment > > position > > > > > for > > > > > > the > > > > > > > > > next offset. The tricky issue is that for a compacted topic, > > the > > > > > > > > > underlying > > > > > > > > > log segment could have changed between two consecutive fetch > > > > > > requests. We > > > > > > > > > could potentially make that case work, but the logic will be > > more > > > > > > > > > complicated. > > > > > > > > > > > > > > > > > > Another thing is that it seems that the proposal only saves > > the > > > > > > metadata > > > > > > > > > overhead if there are low volume topics. If we use Jay's > > > > suggestion > > > > > > of > > > > > > > > > including 0 partitions in subsequent fetch requests, it seems > > > > that > > > > > we > > > > > > > > > could > > > > > > > > > get the metadata saving even if all topics have continuous > > > > traffic. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe < > > > > cmcc...@apache.org> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote: > > > > > > > > > > > Hi, Jay, > > > > > > > > > > > > > > > > > > > > > > I guess in your proposal the leader has to cache the last > > > > > offset > > > > > > > > given > > > > > > > > > > > back for each partition so that it knows from which > > offset to > > > > > > serve > > > > > > > > the > > > > > > > > > > next > > > > > > > > > > > fetch request. > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > Just to clarify, the leader has to cache the last offset > > for > > > > each > > > > > > > > > > follower / UUID in the original KIP-227 proposal as well. > > > > Sorry > > > > > if > > > > > > > > that > > > > > > > > > > wasn't clear. > > > > > > > > > > > > > > > > > > > > > This is doable but it means that the leader needs to do > > an > > > > > > > > > > > additional index lookup per partition to serve a fetch > > > > request. > > > > > > Not > > > > > > > > sure > > > > > > > > > > > if the benefit from the lighter fetch request obviously > > > > offsets > > > > > > the > > > > > > > > > > > additional index lookup though. > > > > > > > > > > > > > > > > > > > > The runtime impact should be a small constant factor at > > most, > > > > > > right? > > > > > > > > > > You would just have a mapping between UUID and the latest > > > > offset > > > > > in > > > > > > > > each > > > > > > > > > > partition data structure. It seems like the runtime > > impact of > > > > > > looking > > > > > > > > > > up the fetch offset in a hash table (or small array) in the > > > > > > in-memory > > > > > > > > > > partition data structure should be very similar to the > > runtime > > > > > > impact > > > > > > > > of > > > > > > > > > > looking up the fetch offset in the FetchRequest. > > > > > > > > > > > > > > > > > > > > The extra memory consumption per partition is > > O(num_brokers), > > > > > > which is > > > > > > > > > > essentially a small constant. (The fact that brokers can > > have > > > > > > multiple > > > > > > > > > > UUIDs due to parallel fetches is a small wrinkle. But we > > can > > > > > > place an > > > > > > > > > > upper bound on the number of UUIDs permitted per broker.) > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 21, 2017 at 7:03 PM, Jay Kreps < > > j...@confluent.io > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > I think the general thrust of this makes a ton of > > sense. > > > > > > > > > > > > > > > > > > > > > > > > I don't love that we're introducing a second type of > > fetch > > > > > > > > request. I > > > > > > > > > > think > > > > > > > > > > > > the motivation is for compatibility, right? But isn't > > that > > > > > what > > > > > > > > > > versioning > > > > > > > > > > > > is for? Basically to me although the modification we're > > > > > making > > > > > > > > makes > > > > > > > > > > sense, > > > > > > > > > > > > the resulting protocol doesn't really seem like > > something > > > > you > > > > > > would > > > > > > > > > > design > > > > > > > > > > > > this way from scratch. > > > > > > > > > > > > > > > > > > > > > > > > I think I may be misunderstanding the semantics of the > > > > > > partitions > > > > > > > > in > > > > > > > > > > > > IncrementalFetchRequest. I think the intention is that > > the > > > > > > server > > > > > > > > > > remembers > > > > > > > > > > > > the partitions you last requested, and the partitions > > you > > > > > > specify > > > > > > > > in > > > > > > > > > > the > > > > > > > > > > > > request are added to this set. This is a bit odd though > > > > > > because you > > > > > > > > > > can add > > > > > > > > > > > > partitions but I don't see how you remove them, so it > > > > doesn't > > > > > > > > really > > > > > > > > > > let > > > > > > > > > > > > you fully make changes incrementally. I suspect I'm > > > > > > > > misunderstanding > > > > > > > > > > that > > > > > > > > > > > > somehow, though. You'd also need to be a little bit > > careful > > > > > > that > > > > > > > > there > > > > > > > > > > was > > > > > > > > > > > > no way for the server's idea of what the client is > > > > interested > > > > > > in > > > > > > > > and > > > > > > > > > > the > > > > > > > > > > > > client's idea to ever diverge as you made these > > > > modifications > > > > > > over > > > > > > > > time > > > > > > > > > > > > (due to bugs or whatever). > > > > > > > > > > > > > > > > > > > > > > > > It seems like an alternative would be to not add a > > second > > > > > > request, > > > > > > > > but > > > > > > > > > > > > instead change the fetch api and implementation > > > > > > > > > > > > > > > > > > > > > > > > 1. We save the partitions you last fetched on that > > > > > > connection > > > > > > > > in the > > > > > > > > > > > > session for the connection (as I think you are > > > > proposing) > > > > > > > > > > > > 2. It only gives you back info on partitions that > > have > > > > > data > > > > > > or > > > > > > > > have > > > > > > > > > > > > changed (no reason you need the others, right?) > > > > > > > > > > > > 3. Not specifying any partitions means "give me the > > > > > usual", > > > > > > as > > > > > > > > > > defined > > > > > > > > > > > > by whatever you requested before attached to the > > > > session. > > > > > > > > > > > > > > > > > > > > > > > > This would be a new version of the fetch API, so > > > > > compatibility > > > > > > > > would be > > > > > > > > > > > > retained by retaining the older version as is. > > > > > > > > > > > > > > > > > > > > > > > > This seems conceptually simpler to me. It's true that > > you > > > > > have > > > > > > to > > > > > > > > > > resend > > > > > > > > > > > > the full set whenever you want to change it, but that > > > > > actually > > > > > > > > seems > > > > > > > > > > less > > > > > > > > > > > > error prone and that should be rare. > > > > > > > > > > > > > > > > > > > > > > > > I suspect you guys thought about this and it doesn't > > quite > > > > > > work, > > > > > > > > but > > > > > > > > > > maybe > > > > > > > > > > > > you could explain why? > > > > > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe < > > > > > > cmcc...@apache.org> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > I created a KIP to improve the scalability and > > latency of > > > > > > > > > > FetchRequest: > > > > > > > > > > > > > https://cwiki.apache.org/confl > > uence/display/KAFKA/KIP- > > > > > > > > > > > > > 227%3A+Introduce+Incremental+F > > etchRequests+to+Increase+ > > > > > > > > > > > > > Partition+Scalability > > > > > > > > > > > > > > > > > > > > > > > > > > Please take a look. > > > > > > > > > > > > > > > > > > > > > > > > > > cheers, > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >