On Wed, Nov 22, 2017, at 13:08, Jay Kreps wrote:
> Okay yeah, what I said didn't really work or make sense. Ismael's
> interpretation is better.
> 
> Couple of things to point out:
> 
>    1. I'm less sure that replication has a high partition count and
>    consumers don't. There are definitely use cases for consumers that
>    subscribe to everything (e.g. load all my data into HDFS) as well as
>    super high partition count topics. In a bigger cluster it is unlikely a
>    given node is actually replicating that many partitions from another
>    particular node (though perhaps in aggregate the effect is the same).
>    I think it would clearly be desirable to have a solution that targeted
>    both the consumer and replication if that were achievable.

Hmm.  I hadn't considered the possibility that consumers might want to
subscribe to a huge number of topics.  That's a fair point (especially
with the replication example).

>    I agree with the concern on memory, but perhaps there could be a way to
>    be smart about the memory usage?

One approach would be to let clients compete for a configurable number
of cache slots on the broker.  So only the first N clients to ask for an
incremental fetch request UUID would receive one.  You could combine
this with making the clients not request an incremental fetch request
unless they were following more than some configurable number of
partitions (like 10).  That way you wouldn't waste all your cache slots
on clients that were only following 1 or 2 partitions, and hence
wouldn't benefit much from the optimization.

This is basically a bet on the idea that if you have clients following a
huge number of partitions, you probably will only have a limited number
of such clients.  Arguably, if you have a huge number of clients
following a huge number of partitions, you are going to have performance
problems anyway.

>    2. For the question of one request vs two, one difference in values
>    here may be that it sounds like you are proposing a less ideal protocol to
>    simplify the broker code. To me the protocol is really *the*
>    fundamental interface in Kafka and we should really strive to make that
>    something that is beautiful and makes sense on its own (without needing
>    to understand the history of how we got there). I think there may well
>    be such an explanation for the two API version (as you kind of said with
>    your HDFS analogy) but really making it clear how these two APIs are 
>    different and how they interact is key. Like, basically I think we should
>    be able to explain it from scratch in such a way that it is obvious you'd
>    have these two things as the fundamental primitives for fetching data.

I can see some arguments for having a single API.  One is that both
incremental and full fetch requests will travel along a similar code
path.  There will also be a lot of the same fields in both the request
and the response.  Separating the APIs means duplicating those fields
(like max_wait_time, min_bytes, isolation_level, etc.)

The argument for having two APIs is that some fields will be be present
in incremental requests and not in full ones, and vice versa.  For
example, incremental requests will have a UUID, whereas full requests
will not.  And clearly, the interpretation of some fields will be a bit
different.  For example, incremental requests will only return
information about changed partitions, whereas full requests will return
information about all partitions in the request.

On the whole, maybe having a single API makes more sense?  There really
would be a lot of duplicated fields if we split the APIs.

best,
Colin

> 
> -Jay
> 
> On Wed, Nov 22, 2017 at 11:02 AM, Colin McCabe <cmcc...@apache.org>
> wrote:
> 
> > Hi Jay,
> >
> > On Tue, Nov 21, 2017, at 19:03, Jay Kreps wrote:
> > > I think the general thrust of this makes a ton of sense.
> > >
> > > I don't love that we're introducing a second type of fetch request. I
> > > think the motivation is for compatibility, right? But isn't that what
> > > versioning s for? Basically to me although the modification we're making
> > makes
> > > sense, the resulting protocol doesn't really seem like something you
> > would
> > > design this way from scratch.
> >
> > I think there are two big reasons to consider separating
> > IncrementalFetchRequest from FetchRequest.
> >
> > As you say, the first reason is compatibility.  We will have to support
> > the full FetchRequest for a long time to come because of our
> > compatibility policy.  It would be good from a code quality point of
> > view to avoid having widely diverging code paths for different versions
> > of this request.
> >
> > The other reason is that conceptually I feel that there should be both
> > full and incremental fetch requests.  This is similar to how HDFS has
> > both incremental and full block reports.  The full reports are necessary
> > when a node is restarted.  In HDFS, they also serve a periodic sanity
> > check if the DataNode's view of what blocks exist has become
> > desynchronized from the NameNode's view.  While in theory you could
> > avoid the sanity check, in practice it often was important.
> >
> > Also, just to be clear, I don't think we should convert KafkaConsumer to
> > using incremental fetch requests.  It seems inadvisable to allocate
> > broker memory for each KafkaConsumer.  After all, there can be quite a
> > few consumers, and we don't know ahead of time how many there will be.
> > This is very different than brokers, where there are a small,
> > more-or-less constant, number.  Also, consumers tend not to consume from
> > a massive number of topics all at once, so I don't think they have the
> > same problems with the existing FetchRequest RPC as followers do.
> >
> > >
> > > I think I may be misunderstanding the semantics of the partitions in
> > > IncrementalFetchRequest. I think the intention is that the server
> > > remembers the partitions you last requested, and the partitions you
> > specify
> > > in the request are added to this set. This is a bit odd though because
> > you can
> > > add partitions but I don't see how you remove them, so it doesn't really
> > let
> > > you fully make changes incrementally. I suspect I'm misunderstanding that
> > > somehow, though.
> >
> > Sorry, I may have done a poor job explaining the proposal.  The
> > intention is that you cannot change the set of partitions you are
> > receiving information about except by making a full FetchRequest.  If
> > you need to make any changes to the watch set whatsoever, you must make
> > a full request, not an incremental.  The idea is that changes are very
> > infrequent, so we don't need to optimize this at the moment.
> >
> > > You'd also need to be a little bit careful that there was
> > > no way for the server's idea of what the client is interested in and the
> > > client's idea to ever diverge as you made these modifications over time
> > > (due to bugs or whatever).
> > >
> > > It seems like an alternative would be to not add a second request, but
> > > instead change the fetch api and implementation
> > >
> > >    1. We save the partitions you last fetched on that connection in the
> > >    session for the connection (as I think you are proposing)
> > >    2. It only gives you back info on partitions that have data or have
> > >    changed (no reason you need the others, right?)
> > >    3. Not specifying any partitions means "give me the usual", as defined
> > >    by whatever you requested before attached to the session.
> > >
> > > This would be a new version of the fetch API, so compatibility would be
> > > retained by retaining the older version as is.
> > >
> > > This seems conceptually simpler to me. It's true that you have to resend
> > > the full set whenever you want to change it, but that actually seems less
> > > error prone and that should be rare.
> > >
> > > I suspect you guys thought about this and it doesn't quite work, but
> > > maybe you could explain why?
> >
> > I think your proposal is actually closer to what I was intending than
> > you thought.  Like I said above, I believe watch-set-change operations
> > should require a full fetch request.  It is certainly simpler to
> > implement and understand.
> >
> > If I understand your proposal correctly, you are suggesting that the
> > existing FetchRequest RPC should be able to do double duty as either a
> > full or an incremental request?
> >
> > best,
> > Colin
> >
> > >
> > > -Jay
> > >
> > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe <cmcc...@apache.org>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I created a KIP to improve the scalability and latency of FetchRequest:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > > > Partition+Scalability
> > > >
> > > > Please take a look.
> > > >
> > > > cheers,
> > > > Colin
> > > >
> >

Reply via email to