On Wed, Nov 22, 2017, at 16:01, Jay Kreps wrote: > Hey Colin, > > WRT memory management I think what you are saying is that you would add a > field to the fetch request which would request that the server cache the > set of partitions and the response would have a field indicating whether > that happened or not. This would allow a bound on memory.
Yeah. > I was also thinking there could be mechanical improvements that would > help efficiency such as sharing topic name or TopicPartition objects to reduce > the footprint in a flyweight style. If you think about it there is > already some memory overhead on a per-connection basis for socket buffers and > purgatory so a little more might be okay. We could just implement our own version of String#intern. Apparently the default one is really bad, but you could probably create a much better one with ConcurrentHashMap. See https://stackoverflow.com/questions/10624232/performance-penalty-of-string-intern . Strings are just one thing, though: there is a lot of other stuff like builders, partition objects, serde containers, temporary objects scala creates, and so on. Algorithmic improvements are a lot more exciting than micro-optimizations here, I think. P.S. Hopefully, newer GCs like Shenandoah will improve our GC performance. [ https://wiki.openjdk.java.net/display/shenandoah/Main ] regards, Colin > > -Jay > > On Wed, Nov 22, 2017 at 1:46 PM, Colin McCabe <cmcc...@apache.org> wrote: > > > On Wed, Nov 22, 2017, at 13:43, Colin McCabe wrote: > > > On Wed, Nov 22, 2017, at 13:08, Jay Kreps wrote: > > > > Okay yeah, what I said didn't really work or make sense. Ismael's > > > > interpretation is better. > > > > > > > > Couple of things to point out: > > > > > > > > 1. I'm less sure that replication has a high partition count and > > > > consumers don't. There are definitely use cases for consumers that > > > > subscribe to everything (e.g. load all my data into HDFS) as well as > > > > super high partition count topics. In a bigger cluster it is > > unlikely a > > > > given node is actually replicating that many partitions from another > > > > particular node (though perhaps in aggregate the effect is the > > same). > > > > I think it would clearly be desirable to have a solution that > > targeted > > > > both the consumer and replication if that were achievable. > > > > > > Hmm. I hadn't considered the possibility that consumers might want to > > > subscribe to a huge number of topics. That's a fair point (especially > > > with the replication example). > > > > > > > I agree with the concern on memory, but perhaps there could be a > > way to > > > > be smart about the memory usage? > > > > > > One approach would be to let clients compete for a configurable number > > > of cache slots on the broker. So only the first N clients to ask for an > > > incremental fetch request UUID would receive one. You could combine > > > this with making the clients not request an incremental fetch request > > > unless they were following more than some configurable number of > > > partitions (like 10). That way you wouldn't waste all your cache slots > > > on clients that were only following 1 or 2 partitions, and hence > > > wouldn't benefit much from the optimization. > > > > By the way, I was envisioning the cache slots as something that would > > time out. So if a client created an incremental fetch UUID and then > > disappeared, we'd eventually purge its cached offsets and let someone > > else use the memory. > > > > > > > > > > > > This is basically a bet on the idea that if you have clients following a > > > huge number of partitions, you probably will only have a limited number > > > of such clients. Arguably, if you have a huge number of clients > > > following a huge number of partitions, you are going to have performance > > > problems anyway. > > > > > > > 2. For the question of one request vs two, one difference in values > > > > here may be that it sounds like you are proposing a less ideal > > protocol to > > > > simplify the broker code. To me the protocol is really *the* > > > > fundamental interface in Kafka and we should really strive to make > > that > > > > something that is beautiful and makes sense on its own (without > > needing > > > > to understand the history of how we got there). I think there may > > well > > > > be such an explanation for the two API version (as you kind of said > > with > > > > your HDFS analogy) but really making it clear how these two APIs are > > > > different and how they interact is key. Like, basically I think we > > should > > > > be able to explain it from scratch in such a way that it is obvious > > you'd > > > > have these two things as the fundamental primitives for fetching > > data. > > > > > > I can see some arguments for having a single API. One is that both > > > incremental and full fetch requests will travel along a similar code > > > path. There will also be a lot of the same fields in both the request > > > and the response. Separating the APIs means duplicating those fields > > > (like max_wait_time, min_bytes, isolation_level, etc.) > > > > > > The argument for having two APIs is that some fields will be be present > > > in incremental requests and not in full ones, and vice versa. For > > > example, incremental requests will have a UUID, whereas full requests > > > will not. And clearly, the interpretation of some fields will be a bit > > > different. For example, incremental requests will only return > > > information about changed partitions, whereas full requests will return > > > information about all partitions in the request. > > > > > > On the whole, maybe having a single API makes more sense? There really > > > would be a lot of duplicated fields if we split the APIs. > > > > > > best, > > > Colin > > > > > > > > > > > -Jay > > > > > > > > On Wed, Nov 22, 2017 at 11:02 AM, Colin McCabe <cmcc...@apache.org> > > > > wrote: > > > > > > > > > Hi Jay, > > > > > > > > > > On Tue, Nov 21, 2017, at 19:03, Jay Kreps wrote: > > > > > > I think the general thrust of this makes a ton of sense. > > > > > > > > > > > > I don't love that we're introducing a second type of fetch > > request. I > > > > > > think the motivation is for compatibility, right? But isn't that > > what > > > > > > versioning s for? Basically to me although the modification we're > > making > > > > > makes > > > > > > sense, the resulting protocol doesn't really seem like something > > you > > > > > would > > > > > > design this way from scratch. > > > > > > > > > > I think there are two big reasons to consider separating > > > > > IncrementalFetchRequest from FetchRequest. > > > > > > > > > > As you say, the first reason is compatibility. We will have to > > support > > > > > the full FetchRequest for a long time to come because of our > > > > > compatibility policy. It would be good from a code quality point of > > > > > view to avoid having widely diverging code paths for different > > versions > > > > > of this request. > > > > > > > > > > The other reason is that conceptually I feel that there should be > > both > > > > > full and incremental fetch requests. This is similar to how HDFS has > > > > > both incremental and full block reports. The full reports are > > necessary > > > > > when a node is restarted. In HDFS, they also serve a periodic sanity > > > > > check if the DataNode's view of what blocks exist has become > > > > > desynchronized from the NameNode's view. While in theory you could > > > > > avoid the sanity check, in practice it often was important. > > > > > > > > > > Also, just to be clear, I don't think we should convert > > KafkaConsumer to > > > > > using incremental fetch requests. It seems inadvisable to allocate > > > > > broker memory for each KafkaConsumer. After all, there can be quite > > a > > > > > few consumers, and we don't know ahead of time how many there will > > be. > > > > > This is very different than brokers, where there are a small, > > > > > more-or-less constant, number. Also, consumers tend not to consume > > from > > > > > a massive number of topics all at once, so I don't think they have > > the > > > > > same problems with the existing FetchRequest RPC as followers do. > > > > > > > > > > > > > > > > > I think I may be misunderstanding the semantics of the partitions > > in > > > > > > IncrementalFetchRequest. I think the intention is that the server > > > > > > remembers the partitions you last requested, and the partitions you > > > > > specify > > > > > > in the request are added to this set. This is a bit odd though > > because > > > > > you can > > > > > > add partitions but I don't see how you remove them, so it doesn't > > really > > > > > let > > > > > > you fully make changes incrementally. I suspect I'm > > misunderstanding that > > > > > > somehow, though. > > > > > > > > > > Sorry, I may have done a poor job explaining the proposal. The > > > > > intention is that you cannot change the set of partitions you are > > > > > receiving information about except by making a full FetchRequest. If > > > > > you need to make any changes to the watch set whatsoever, you must > > make > > > > > a full request, not an incremental. The idea is that changes are > > very > > > > > infrequent, so we don't need to optimize this at the moment. > > > > > > > > > > > You'd also need to be a little bit careful that there was > > > > > > no way for the server's idea of what the client is interested in > > and the > > > > > > client's idea to ever diverge as you made these modifications over > > time > > > > > > (due to bugs or whatever). > > > > > > > > > > > > It seems like an alternative would be to not add a second request, > > but > > > > > > instead change the fetch api and implementation > > > > > > > > > > > > 1. We save the partitions you last fetched on that connection > > in the > > > > > > session for the connection (as I think you are proposing) > > > > > > 2. It only gives you back info on partitions that have data or > > have > > > > > > changed (no reason you need the others, right?) > > > > > > 3. Not specifying any partitions means "give me the usual", as > > defined > > > > > > by whatever you requested before attached to the session. > > > > > > > > > > > > This would be a new version of the fetch API, so compatibility > > would be > > > > > > retained by retaining the older version as is. > > > > > > > > > > > > This seems conceptually simpler to me. It's true that you have to > > resend > > > > > > the full set whenever you want to change it, but that actually > > seems less > > > > > > error prone and that should be rare. > > > > > > > > > > > > I suspect you guys thought about this and it doesn't quite work, > > but > > > > > > maybe you could explain why? > > > > > > > > > > I think your proposal is actually closer to what I was intending than > > > > > you thought. Like I said above, I believe watch-set-change > > operations > > > > > should require a full fetch request. It is certainly simpler to > > > > > implement and understand. > > > > > > > > > > If I understand your proposal correctly, you are suggesting that the > > > > > existing FetchRequest RPC should be able to do double duty as either > > a > > > > > full or an incremental request? > > > > > > > > > > best, > > > > > Colin > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe <cmcc...@apache.org> > > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > I created a KIP to improve the scalability and latency of > > FetchRequest: > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > > > > > > > Partition+Scalability > > > > > > > > > > > > > > Please take a look. > > > > > > > > > > > > > > cheers, > > > > > > > Colin > > > > > > > > > > > > > >