>
> Thinking about this again. I do see the reason that we want to have a epoch
> to avoid out of order registration of the interested set. But I am
> wondering if the following semantic would meet what we want better:
>  - Session Id: the id assigned to a single client for life long time. i.e
> it does not change when the interested partitions change.
>  - Epoch: the interested set epoch. Only updated when a full fetch request
> comes, which may result in the interested partition set change.
> This will ensure that the registered interested set will always be the
> latest registration. And the clients can change the interested partition
> set without creating another session.


I agree this is a bit more intuitive than the sequence number and the
ability to reuse the session is beneficial since it causes less waste of
the cache for session timeouts. I would say that the epoch should be
controlled by the client and a bump of the epoch indicates a full fetch
request. The client should also bump the epoch if it fails to receive a
fetch response. This ensures that the broker cannot receive an old request
after the client has reconnected and sent a new one which could cause an
invalid session state.

-Jason


On Tue, Dec 5, 2017 at 9:38 PM, Becket Qin <becket....@gmail.com> wrote:

> Hi Jun,
>
> That is true, but in reality it seems rare that the fetch size is smaller
> than index interval. In the worst case, we may need to do another look up.
> In the future, when we have the mechanism to inform the clients about the
> broker configurations, the clients may want to configure correspondingly as
> well, e.g. max message size, max timestamp difference, etc.
>
> On the other hand, we are not guaranteeing that the returned bytes in a
> partition is always bounded by the per partition fetch size, because we are
> going to return at least one message, so the per partition fetch size seems
> already a soft limit. Since we are introducing a new fetch protocol and
> this is related, it might be worth considering this option.
>
> BTW, one reason I bring this up again was because yesterday we had a
> presentation from Uber regarding the end to end latency. And they are
> seeing this binary search behavior impacting the latency due to page in/out
> of the index file.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Tue, Dec 5, 2017 at 5:55 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Jiangjie,
> >
> > Not sure returning the fetch response at the index boundary is a general
> > solution. The index interval is configurable. If one configures the index
> > interval larger than the per partition fetch size, we probably have to
> > return data not at the index boundary.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Dec 5, 2017 at 4:17 PM, Becket Qin <becket....@gmail.com> wrote:
> >
> > > Hi Colin,
> > >
> > > Thinking about this again. I do see the reason that we want to have a
> > epoch
> > > to avoid out of order registration of the interested set. But I am
> > > wondering if the following semantic would meet what we want better:
> > >  - Session Id: the id assigned to a single client for life long time.
> i.e
> > > it does not change when the interested partitions change.
> > >  - Epoch: the interested set epoch. Only updated when a full fetch
> > request
> > > comes, which may result in the interested partition set change.
> > > This will ensure that the registered interested set will always be the
> > > latest registration. And the clients can change the interested
> partition
> > > set without creating another session.
> > >
> > > Also I want to bring up the way the leader respond to the FetchRequest
> > > again. I think it would be a big improvement if we just return the
> > > responses at index entry boundary or log end. There are a few benefits:
> > > 1. The leader does not need the follower to provide the offsets,
> > > 2. The fetch requests no longer need to do a binary search on the
> index,
> > it
> > > just need to do a linear access to the index file, which is much cache
> > > friendly.
> > >
> > > Assuming the leader can get the last returned offsets to the clients
> > > cheaply, I am still not sure why it is necessary for the followers to
> > > repeat the offsets in the incremental fetch every time. Intuitively it
> > > should only update the offsets when the leader has wrong offsets, in
> most
> > > cases, the incremental fetch request should just be empty. Otherwise we
> > may
> > > not be saving much when there are continuous small requests going to
> each
> > > partition, which could be normal for some low latency systems.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > >
> > > On Tue, Dec 5, 2017 at 2:14 PM, Colin McCabe <cmcc...@apache.org>
> wrote:
> > >
> > > > On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > > > > Hi Colin
> > > > >
> > > > > Addressing the topic of how to manage slots from the other thread.
> > > > > With tcp connections all this comes for free essentially.
> > > >
> > > > Hi Jan,
> > > >
> > > > I don't think that it's accurate to say that cache management "comes
> > for
> > > > free" by coupling the incremental fetch session with the TCP session.
> > > > When a new TCP session is started by a fetch request, you still have
> to
> > > > decide whether to grant that request an incremental fetch session or
> > > > not.  If your answer is that you always grant the request, I would
> > argue
> > > > that you do not have cache management.
> > > >
> > > > I guess you could argue that timeouts are cache management, but I
> don't
> > > > find that argument persuasive.  Anyone could just create a lot of TCP
> > > > sessions and use a lot of resources, in that case.  So there is
> > > > essentially no limit on memory use.  In any case, TCP sessions don't
> > > > help us implement fetch session timeouts.
> > > >
> > > > > I still would argue we disable it by default and make a flag in the
> > > > > broker to ask the leader to maintain the cache while replicating
> and
> > > > also only
> > > > > have it optional in consumers (default to off) so one can turn it
> on
> > > > > where it really hurts.  MirrorMaker and audit consumers
> prominently.
> > > >
> > > > I agree with Jason's point from earlier in the thread.  Adding extra
> > > > configuration knobs that aren't really necessary can harm usability.
> > > > Certainly asking people to manually turn on a feature "where it
> really
> > > > hurts" seems to fall in that category, when we could easily enable it
> > > > automatically for them.
> > > >
> > > > >
> > > > > Otherwise I left a few remarks in-line, which should help to
> > understand
> > > > > my view of the situation better
> > > > >
> > > > > Best Jan
> > > > >
> > > > >
> > > > > On 05.12.2017 08:06, Colin McCabe wrote:
> > > > > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > > > > >>
> > > > > >> On 03.12.2017 21:55, Colin McCabe wrote:
> > > > > >>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > > > > >>>> Thanks for the explanation, Colin. A few more questions.
> > > > > >>>>
> > > > > >>>>> The session epoch is not complex.  It's just a number which
> > > > increments
> > > > > >>>>> on each incremental fetch.  The session epoch is also useful
> > for
> > > > > >>>>> debugging-- it allows you to match up requests and responses
> > when
> > > > > >>>>> looking at log files.
> > > > > >>>> Currently each request in Kafka has a correlation id to help
> > match
> > > > the
> > > > > >>>> requests and responses. Is epoch doing something differently?
> > > > > >>> Hi Becket,
> > > > > >>>
> > > > > >>> The correlation ID is used within a single TCP session, to
> > uniquely
> > > > > >>> associate a request with a response.  The correlation ID is not
> > > > unique
> > > > > >>> (and has no meaning) outside the context of that single TCP
> > > session.
> > > > > >>>
> > > > > >>> Keep in mind, NetworkClient is in charge of TCP sessions, and
> > > > generally
> > > > > >>> tries to hide that information from the upper layers of the
> code.
> > > So
> > > > > >>> when you submit a request to NetworkClient, you don't know if
> > that
> > > > > >>> request creates a TCP session, or reuses an existing one.
> > > > > >>>>> Unfortunately, this doesn't work.  Imagine the client misses
> an
> > > > > >>>>> increment fetch response about a partition.  And then the
> > > > partition is
> > > > > >>>>> never updated after that.  The client has no way to know
> about
> > > the
> > > > > >>>>> partition, since it won't be included in any future
> incremental
> > > > fetch
> > > > > >>>>> responses.  And there are no offsets to compare, since the
> > > > partition is
> > > > > >>>>> simply omitted from the response.
> > > > > >>>> I am curious about in which situation would the follower miss
> a
> > > > response
> > > > > >>>> of a partition. If the entire FetchResponse is lost (e.g.
> > > timeout),
> > > > the
> > > > > >>>> follower would disconnect and retry. That will result in
> > sending a
> > > > full
> > > > > >>>> FetchRequest.
> > > > > >>> Basically, you are proposing that we rely on TCP for reliable
> > > > delivery
> > > > > >>> in a distributed system.  That isn't a good idea for a bunch of
> > > > > >>> different reasons.  First of all, TCP timeouts tend to be very
> > > > long.  So
> > > > > >>> if the TCP session timing out is your error detection
> mechanism,
> > > you
> > > > > >>> have to wait minutes for messages to timeout.  Of course, we
> add
> > a
> > > > > >>> timeout on top of that after which we declare the connection
> bad
> > > and
> > > > > >>> manually close it.  But just because the session is closed on
> one
> > > end
> > > > > >>> doesn't mean that the other end knows that it is closed.  So
> the
> > > > leader
> > > > > >>> may have to wait quite a long time before TCP decides that yes,
> > > > > >>> connection X from the follower is dead and not coming back,
> even
> > > > though
> > > > > >>> gremlins ate the FIN packet which the follower attempted to
> > > > translate.
> > > > > >>> If the cache state is tied to that TCP session, we have to keep
> > > that
> > > > > >>> cache around for a much longer time than we should.
> > > > > >> Hi,
> > > > > >>
> > > > > >> I see this from a different perspective. The cache expiry time
> > > > > >> has the same semantic as idle connection time in this scenario.
> > > > > >> It is the time range we expect the client to come back an reuse
> > > > > >> its broker side state. I would argue that on close we would get
> an
> > > > > >> extra shot at cleaning up the session state early. As opposed to
> > > > > >> always wait for that duration for expiry to happen.
> > > > > > Hi Jan,
> > > > > >
> > > > > > The idea here is that the incremental fetch cache expiry time can
> > be
> > > > > > much shorter than the TCP session timeout.  In general the TCP
> > > session
> > > > > > timeout is common to all TCP connections, and very long.  To make
> > > these
> > > > > > numbers a little more concrete, the TCP session timeout is often
> > > > > > configured to be 2 hours on Linux.  (See
> > > > > > https://www.cyberciti.biz/tips/linux-increasing-or-
> > > > decreasing-tcp-sockets-timeouts.html
> > > > > > )  The timeout I was proposing for incremental fetch sessions was
> > one
> > > > or
> > > > > > two minutes at most.
> > > > > Currently this is taken care of by
> > > > > connections.max.idle.ms on the broker and defaults to something of
> > few
> > > > > minutes.
> > > >
> > > > It is 10 minutes by default, which is longer than what we want the
> > > > incremental fetch session timeout to be.  There's no reason to couple
> > > > these two things.
> > > >
> > > > > Also something we could let the client change if we really wanted
> to.
> > > > > So there is no need to worry about coupling our implementation to
> > some
> > > > > timeouts given by the OS, with TCP one always has full control over
> > the
> > > > worst
> > > > > times + one gets the extra shot cleaning up early when the close
> > comes
> > > > through.
> > > > > Which is the majority of the cases.
> > > >
> > > > In the majority of cases, the TCP session will be re-established.  In
> > > > that case, we have to send a full fetch request rather than an
> > > > incremental fetch request.
> > > >
> > > > >
> > > > > >
> > > > > >>> Secondly, from a software engineering perspective, it's not a
> > good
> > > > idea
> > > > > >>> to try to tightly tie together TCP and our code.  We would have
> > to
> > > > > >>> rework how we interact with NetworkClient so that we are aware
> of
> > > > things
> > > > > >>> like TCP sessions closing or opening.  We would have to be
> > careful
> > > > > >>> preserve the ordering of incoming messages when doing things
> like
> > > > > >>> putting incoming requests on to a queue to be processed by
> > multiple
> > > > > >>> threads.  It's just a lot of complexity to add, and there's no
> > > > upside.
> > > > > >> I see the point here. And I had a small chat with Dong Lin
> already
> > > > > >> making me aware of this. I tried out the approaches and propose
> > the
> > > > > >> following:
> > > > > >>
> > > > > >> The client start and does a full fetch. It then does incremental
> > > > fetches.
> > > > > >> The connection to the broker dies and is re-established by
> > > > NetworkClient
> > > > > >> under the hood.
> > > > > >> The broker sees an incremental fetch without having state =>
> > returns
> > > > > >> error:
> > > > > >> Client sees the error, does a full fetch and goes back to
> > > > incrementally
> > > > > >> fetching.
> > > > > >>
> > > > > >> having this 1 additional error round trip is essentially the
> same
> > as
> > > > > >> when something
> > > > > >> with the sessions or epoch changed unexpectedly to the client
> (say
> > > > > >> expiry).
> > > > > >>
> > > > > >> So its nothing extra added but the conditions are easier to
> > > evaluate.
> > > > > >> Especially since we do everything with NetworkClient. Other
> > > > implementers
> > > > > >> on the
> > > > > >> protocol are free to optimizes this and do not do the errornours
> > > > > >> roundtrip on the
> > > > > >> new connection.
> > > > > >> Its a great plus that the client can know when the error is
> gonna
> > > > > >> happen. instead of
> > > > > >> the server to always have to report back if something changes
> > > > > >> unexpectedly for the client
> > > > > > You are assuming that the leader and the follower agree that the
> > TCP
> > > > > > session drops at the same time.  When there are network problems,
> > > this
> > > > > > may not be true.  The leader may still think the previous TCP
> > session
> > > > is
> > > > > > active.  In that case, we have to keep the incremental fetch
> > session
> > > > > > state around until we learn otherwise (which could be up to that
> 2
> > > hour
> > > > > > timeout I mentioned).  And if we get a new incoming incremental
> > fetch
> > > > > > request, we can't assume that it replaces the previous one,
> because
> > > the
> > > > > > IDs will be different (the new one starts a new session).
> > > > > As mentioned, no reason to fear some time-outs out of our control
> > > > > >
> > > > > >>> Imagine that I made an argument that client IDs are "complex"
> and
> > > > should
> > > > > >>> be removed from our APIs.  After all, we can just look at the
> > > remote
> > > > IP
> > > > > >>> address and TCP port of each connection.  Would you think that
> > was
> > > a
> > > > > >>> good idea?  The client ID is useful when looking at logs.  For
> > > > example,
> > > > > >>> if a rebalance is having problems, you want to know what
> clients
> > > were
> > > > > >>> having a problem.  So having the client ID field to guide you
> is
> > > > > >>> actually much less "complex" in practice than not having an ID.
> > > > > >> I still cant follow why the correlation idea will not help here.
> > > > > >> Correlating logs with it usually works great. Even with
> primitive
> > > > tools
> > > > > >> like grep
> > > > > > The correlation ID does help somewhat, but certainly not as much
> > as a
> > > > > > unique 64-bit ID.  The correlation ID is not unique in the
> broker,
> > > just
> > > > > > unique to a single NetworkClient.  Simiarly, the correlation ID
> is
> > > not
> > > > > > unique on the client side, if there are multiple Consumers, etc.
> > > > > Can always bump entropy in correlation IDs, never had a problem
> > > > > of finding to many duplicates. Would be a different KIP though.
> > > > > >
> > > > > >>> Similarly, if metadata responses had epoch numbers (simple
> > > > incrementing
> > > > > >>> numbers), we would not have to debug problems like clients
> > > > accidentally
> > > > > >>> getting old metadata from servers that had been partitioned off
> > > from
> > > > the
> > > > > >>> network for a while.  Clients would know the difference between
> > old
> > > > and
> > > > > >>> new metadata.  So putting epochs in to the metadata request is
> > much
> > > > less
> > > > > >>> "complex" operationally, even though it's an extra field in the
> > > > request.
> > > > > >>>    This has been discussed before on the mailing list.
> > > > > >>>
> > > > > >>> So I think the bottom line for me is that having the session ID
> > and
> > > > > >>> session epoch, while it adds two extra fields, reduces
> > operational
> > > > > >>> complexity and increases debuggability.  It avoids tightly
> > coupling
> > > > us
> > > > > >>> to assumptions about reliable ordered delivery which tend to be
> > > > violated
> > > > > >>> in practice in multiple layers of the stack.  Finally, it
> avoids
> > > the
> > > > > >>> necessity of refactoring NetworkClient.
> > > > > >> So there is stacks out there that violate TCP guarantees? And
> > > software
> > > > > >> still works? How can this be? Can you elaborate a little where
> > this
> > > > > >> can be violated? I am not very familiar with virtualized
> > > environments
> > > > > >> but they can't really violate TCP contracts.
> > > > > > TCP's guarantees of reliable, in-order transmission certainly can
> > be
> > > > > > violated.  For example, I once had to debug a cluster where a
> > certain
> > > > > > node had a network card which corrupted its transmissions
> > > occasionally.
> > > > > > With all the layers of checksums, you would think that this was
> not
> > > > > > possible, but it happened.  We occasionally got corrupted data
> > > written
> > > > > > to disk on the other end because of it.  Even more frustrating,
> the
> > > > data
> > > > > > was not corrupted on disk on the sending node-- it was a bug in
> the
> > > > > > network card driver that was injecting the errors.
> > > > > true, but your broker might aswell read a corrupted 600GB as size
> > from
> > > > > the network and die with OOM instantly.
> > > >
> > > > If you read 600 GB as the size from the network, you will not "die
> with
> > > > OOM instantly."  That would be a bug.  Instead, you will notice that
> > 600
> > > > GB is greater than max.message.bytes, and close the connection.
> > > >
> > > > > Optimizing for still having functional
> > > > > software under this circumstances is not reasonable.
> > > > > You want to get rid of such a
> > > > > node ASAP and pray that zookeepers ticks get corrupted often enough
> > > > > that it finally drops out of the cluster.
> > > > >
> > > > > There is a good reason that these kinda things
> > > > > https://issues.apache.org/jira/browse/MESOS-4105
> > > > > don't end up as kafka Jiras. In the end you can't run any software
> in
> > > > > these containers anymore. Application layer checksums are a neat
> > thing
> > > to
> > > > > fail fast but trying to cope with this probably causes more bad
> than
> > > > > good.  So I would argue that we shouldn't try this for the fetch
> > > > requests.
> > > >
> > > > One of the goals of Apache Kafka is to be "a streaming platform...
> > > > [that] lets you store streams of records in a fault-tolerant way."
> For
> > > > more information, see https://kafka.apache.org/intro .
> > Fault-tolerance
> > > > is explicitly part of the goal of Kafka.  Prayer should be optional,
> > not
> > > > required, when running the software.
> > > >
> > > > Crashing because someone sent you a bad packet is not reasonable
> > > > behavior.  It is a bug.  Similarly, bringing down the whole cluster,
> > > > which could a hundred nodes, because someone had a bad network
> adapter
> > > > is not reasonable behavior.  It is perhaps reasonable for the cluster
> > to
> > > > perform worse when hardware is having problems.  But that's a
> different
> > > > discussion.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > >
> > > > >
> > > > > >
> > > > > > However, my point was not about TCP's guarantees being violated.
> > My
> > > > > > point is that TCP's guarantees are only one small building block
> to
> > > > > > build a robust distributed system.  TCP basically just says that
> if
> > > you
> > > > > > get any bytes from the stream, you will get the ones that were
> sent
> > > by
> > > > > > the sender, in the order they were sent.  TCP does not guarantee
> > that
> > > > > > the bytes you send will get there.  It does not guarantee that if
> > you
> > > > > > close the connection, the other end will know about it in a
> timely
> > > > > > fashion.
> > > > > These are very powerful grantees and since we use TCP we should
> > > > > piggy pack everything that is reasonable on to it. IMO there is no
> > > > > need to reimplement correct sequencing again if you get that from
> > > > > your transport layer. It saves you the complexity, it makes
> > > > > you application behave way more naturally and your api easier to
> > > > > understand.
> > > > >
> > > > > There is literally nothing the Kernel wont let you decide
> > > > > especially not any timings. Only noticeable exception being
> TIME_WAIT
> > > > > of usually 240 seconds but that already has little todo with the
> > broker
> > > > > itself and
> > > > > if we are running out of usable ports because of this then expiring
> > > > > fetch requests
> > > > > wont help much anyways.
> > > > >
> > > > > I hope I could strengthen the trust you have in userland TCP
> > connection
> > > > > management. It is really powerful and can be exploited for maximum
> > > gains
> > > > > without much risk in my opinion.
> > > > >
> > > > >
> > > > >
> > > > > > It does not guarantee that the bytes will be received in a
> > > > > > certain timeframe, and certainly doesn't guarantee that if you
> > send a
> > > > > > byte on connection X and then on connection Y, that the remote
> end
> > > will
> > > > > > read a byte on X before reading a byte on Y.
> > > > > Noone expects this from two independent paths of any kind.
> > > > >
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > >> Hope this made my view clearer, especially the first part.
> > > > > >>
> > > > > >> Best Jan
> > > > > >>
> > > > > >>
> > > > > >>> best,
> > > > > >>> Colin
> > > > > >>>
> > > > > >>>
> > > > > >>>> If there is an error such as NotLeaderForPartition is
> > > > > >>>> returned for some partitions, the follower can always send a
> > full
> > > > > >>>> FetchRequest. Is there a scenario that only some of the
> > partitions
> > > > in a
> > > > > >>>> FetchResponse is lost?
> > > > > >>>>
> > > > > >>>> Thanks,
> > > > > >>>>
> > > > > >>>> Jiangjie (Becket) Qin
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<
> cmcc...@apache.org
> > >
> > > > wrote:
> > > > > >>>>
> > > > > >>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > > > > >>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<
> > > cmcc...@apache.org>
> > > > > >>>>> wrote:
> > > > > >>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > > > > >>>>>>>> Hey Colin,
> > > > > >>>>>>>>
> > > > > >>>>>>>> Thanks much for the update. I have a few questions below:
> > > > > >>>>>>>>
> > > > > >>>>>>>> 1. I am not very sure that we need Fetch Session Epoch. It
> > > > seems that
> > > > > >>>>>>>> Fetch
> > > > > >>>>>>>> Session Epoch is only needed to help leader distinguish
> > > between
> > > > "a
> > > > > >>>>> full
> > > > > >>>>>>>> fetch request" and "a full fetch request and request a new
> > > > > >>>>> incremental
> > > > > >>>>>>>> fetch session". Alternatively, follower can also indicate
> "a
> > > > full
> > > > > >>>>> fetch
> > > > > >>>>>>>> request and request a new incremental fetch session" by
> > > setting
> > > > Fetch
> > > > > >>>>>>>> Session ID to -1 without using Fetch Session Epoch. Does
> > this
> > > > make
> > > > > >>>>> sense?
> > > > > >>>>>>> Hi Dong,
> > > > > >>>>>>>
> > > > > >>>>>>> The fetch session epoch is very important for ensuring
> > > > correctness.  It
> > > > > >>>>>>> prevents corrupted or incomplete fetch data due to network
> > > > reordering
> > > > > >>>>> or
> > > > > >>>>>>> loss.
> > > > > >>>>>>>
> > > > > >>>>>>> For example, consider a scenario where the follower sends a
> > > fetch
> > > > > >>>>>>> request to the leader.  The leader responds, but the
> response
> > > is
> > > > lost
> > > > > >>>>>>> because of network problems which affected the TCP session.
> > In
> > > > that
> > > > > >>>>>>> case, the follower must establish a new TCP session and
> > re-send
> > > > the
> > > > > >>>>>>> incremental fetch request.  But the leader does not know
> that
> > > the
> > > > > >>>>>>> follower didn't receive the previous incremental fetch
> > > > response.  It is
> > > > > >>>>>>> only the incremental fetch epoch which lets the leader know
> > > that
> > > > it
> > > > > >>>>>>> needs to resend that data, and not data which comes
> > afterwards.
> > > > > >>>>>>>
> > > > > >>>>>>> You could construct similar scenarios with message
> > reordering,
> > > > > >>>>>>> duplication, etc.  Basically, this is a stateful protocol
> on
> > an
> > > > > >>>>>>> unreliable network, and you need to know whether the
> follower
> > > > got the
> > > > > >>>>>>> previous data you sent before you move on.  And you need to
> > > > handle
> > > > > >>>>>>> issues like duplicated or delayed requests.  These issues
> do
> > > not
> > > > affect
> > > > > >>>>>>> the full fetch request, because it is not stateful-- any
> full
> > > > fetch
> > > > > >>>>>>> request can be understood and properly responded to in
> > > isolation.
> > > > > >>>>>>>
> > > > > >>>>>> Thanks for the explanation. This makes sense. On the other
> > hand
> > > I
> > > > would
> > > > > >>>>>> be interested in learning more about whether Becket's
> solution
> > > > can help
> > > > > >>>>>> simplify the protocol by not having the echo field and
> whether
> > > > that is
> > > > > >>>>>> worth doing.
> > > > > >>>>> Hi Dong,
> > > > > >>>>>
> > > > > >>>>> I commented about this in the other thread.  A solution which
> > > > doesn't
> > > > > >>>>> maintain session information doesn't work here.
> > > > > >>>>>
> > > > > >>>>>>>> 2. It is said that Incremental FetchRequest will include
> > > > partitions
> > > > > >>>>> whose
> > > > > >>>>>>>> fetch offset or maximum number of fetch bytes has been
> > > changed.
> > > > If
> > > > > >>>>>>>> follower's logStartOffet of a partition has changed,
> should
> > > this
> > > > > >>>>>>>> partition also be included in the next FetchRequest to the
> > > > leader?
> > > > > >>>>>>> Otherwise, it
> > > > > >>>>>>>> may affect the handling of DeleteRecordsRequest because
> > leader
> > > > may
> > > > > >>>>> not
> > > > > >>>>>>> know
> > > > > >>>>>>>> the corresponding data has been deleted on the follower.
> > > > > >>>>>>> Yeah, the follower should include the partition if the
> > > > logStartOffset
> > > > > >>>>>>> has changed.  That should be spelled out on the KIP.
> Fixed.
> > > > > >>>>>>>
> > > > > >>>>>>>> 3. In the section "Per-Partition Data", a partition is not
> > > > considered
> > > > > >>>>>>>> dirty if its log start offset has changed. Later in the
> > > section
> > > > > >>>>>>> "FetchRequest
> > > > > >>>>>>>> Changes", it is said that incremental fetch responses will
> > > > include a
> > > > > >>>>>>>> partition if its logStartOffset has changed. It seems
> > > > inconsistent.
> > > > > >>>>> Can
> > > > > >>>>>>>> you update the KIP to clarify it?
> > > > > >>>>>>>>
> > > > > >>>>>>> In the "Per-Partition Data" section, it does say that
> > > > logStartOffset
> > > > > >>>>>>> changes make a partition dirty, though, right?  The first
> > > bullet
> > > > point
> > > > > >>>>>>> is:
> > > > > >>>>>>>
> > > > > >>>>>>>> * The LogCleaner deletes messages, and this changes the
> log
> > > > start
> > > > > >>>>> offset
> > > > > >>>>>>> of the partition on the leader., or
> > > > > >>>>>>>
> > > > > >>>>>> Ah I see. I think I didn't notice this because statement
> > assumes
> > > > that the
> > > > > >>>>>> LogStartOffset in the leader only changes due to LogCleaner.
> > In
> > > > fact the
> > > > > >>>>>> LogStartOffset can change on the leader due to either log
> > > > retention and
> > > > > >>>>>> DeleteRecordsRequest. I haven't verified whether LogCleaner
> > can
> > > > change
> > > > > >>>>>> LogStartOffset though. It may be a bit better to just say
> > that a
> > > > > >>>>>> partition is considered dirty if LogStartOffset changes.
> > > > > >>>>> I agree.  It should be straightforward to just resend the
> > > > partition if
> > > > > >>>>> logStartOffset changes.
> > > > > >>>>>
> > > > > >>>>>>>> 4. In "Fetch Session Caching" section, it is said that
> each
> > > > broker
> > > > > >>>>> has a
> > > > > >>>>>>>> limited number of slots. How is this number determined?
> Does
> > > > this
> > > > > >>>>> require
> > > > > >>>>>>>> a new broker config for this number?
> > > > > >>>>>>> Good point.  I added two broker configuration parameters to
> > > > control
> > > > > >>>>> this
> > > > > >>>>>>> number.
> > > > > >>>>>>>
> > > > > >>>>>> I am curious to see whether we can avoid some of these new
> > > > configs. For
> > > > > >>>>>> example, incremental.fetch.session.cache.slots.per.broker
> is
> > > > probably
> > > > > >>>>> not
> > > > > >>>>>> necessary because if a leader knows that a FetchRequest
> comes
> > > > from a
> > > > > >>>>>> follower, we probably want the leader to always cache the
> > > > information
> > > > > >>>>>> from that follower. Does this make sense?
> > > > > >>>>> Yeah, maybe we can avoid having
> > > > > >>>>> incremental.fetch.session.cache.slots.per.broker.
> > > > > >>>>>
> > > > > >>>>>> Maybe we can discuss the config later after there is
> agreement
> > > on
> > > > how the
> > > > > >>>>>> protocol would look like.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>>> What is the error code if broker does
> > > > > >>>>>>>> not have new log for the incoming FetchRequest?
> > > > > >>>>>>> Hmm, is there a typo in this question?  Maybe you meant to
> > ask
> > > > what
> > > > > >>>>>>> happens if there is no new cache slot for the incoming
> > > > FetchRequest?
> > > > > >>>>>>> That's not an error-- the incremental fetch session ID just
> > > gets
> > > > set to
> > > > > >>>>>>> 0, indicating no incremental fetch session was created.
> > > > > >>>>>>>
> > > > > >>>>>> Yeah there is a typo. You have answered my question.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>>> 5. Can you clarify what happens if follower adds a
> partition
> > > to
> > > > the
> > > > > >>>>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest?
> > Does
> > > > leader
> > > > > >>>>>>>> needs to generate a new session for this
> > ReplicaFetcherThread
> > > or
> > > > > >>>>> does it
> > > > > >>>>>>> re-use
> > > > > >>>>>>>> the existing session?  If it uses a new session, is the
> old
> > > > session
> > > > > >>>>>>>> actively deleted from the slot?
> > > > > >>>>>>> The basic idea is that you can't make changes, except by
> > > sending
> > > > a full
> > > > > >>>>>>> fetch request.  However, perhaps we can allow the client to
> > > > re-use its
> > > > > >>>>>>> existing session ID.  If the client sets sessionId = id,
> > epoch
> > > =
> > > > 0, it
> > > > > >>>>>>> could re-initialize the session.
> > > > > >>>>>>>
> > > > > >>>>>> Yeah I agree with the basic idea. We probably want to
> > understand
> > > > more
> > > > > >>>>>> detail about how this works later.
> > > > > >>>>> Sounds good.  I updated the KIP with this information.  A
> > > > > >>>>> re-initialization should be exactly the same as an
> > > initialization,
> > > > > >>>>> except that it reuses an existing ID.
> > > > > >>>>>
> > > > > >>>>> best,
> > > > > >>>>> Colin
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>>>> BTW, I think it may be useful if the KIP can include the
> > > example
> > > > > >>>>> workflow
> > > > > >>>>>>>> of how this feature will be used in case of partition
> change
> > > > and so
> > > > > >>>>> on.
> > > > > >>>>>>> Yeah, that might help.
> > > > > >>>>>>>
> > > > > >>>>>>> best,
> > > > > >>>>>>> Colin
> > > > > >>>>>>>
> > > > > >>>>>>>> Thanks,
> > > > > >>>>>>>> Dong
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe<
> > > > cmcc...@apache.org>
> > > > > >>>>>>>> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> I updated the KIP with the ideas we've been discussing.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> best,
> > > > > >>>>>>>>> Colin
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
> > > > > >>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
> > > > > >>>>>>>>>>> Hi Colin, thank you  for this KIP, it can become a
> really
> > > > > >>>>> useful
> > > > > >>>>>>> thing.
> > > > > >>>>>>>>>>> I just scanned through the discussion so far and wanted
> > to
> > > > > >>>>> start a
> > > > > >>>>>>>>>>> thread to make as decision about keeping the
> > > > > >>>>>>>>>>> cache with the Connection / Session or having some sort
> > of
> > > > UUID
> > > > > >>>>>>> indN
> > > > > >>>>>>>>> exed
> > > > > >>>>>>>>>>> global Map.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Sorry if that has been settled already and I missed it.
> > In
> > > > this
> > > > > >>>>>>> case
> > > > > >>>>>>>>>>> could anyone point me to the discussion?
> > > > > >>>>>>>>>> Hi Jan,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I don't think anyone has discussed the idea of tying the
> > > cache
> > > > > >>>>> to an
> > > > > >>>>>>>>>> individual TCP session yet.  I agree that since the
> cache
> > is
> > > > > >>>>>>> intended to
> > > > > >>>>>>>>>> be used only by a single follower or client, it's an
> > > > interesting
> > > > > >>>>>>> thing
> > > > > >>>>>>>>>> to think about.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I guess the obvious disadvantage is that whenever your
> TCP
> > > > > >>>>> session
> > > > > >>>>>>>>>> drops, you have to make a full fetch request rather than
> > an
> > > > > >>>>>>> incremental
> > > > > >>>>>>>>>> one.  It's not clear to me how often this happens in
> > > practice
> > > > --
> > > > > >>>>> it
> > > > > >>>>>>>>>> probably depends a lot on the quality of the network.
> > From
> > > a
> > > > > >>>>> code
> > > > > >>>>>>>>>> perspective, it might also be a bit difficult to access
> > data
> > > > > >>>>>>> associated
> > > > > >>>>>>>>>> with the Session from classes like KafkaApis (although
> we
> > > > could
> > > > > >>>>>>> refactor
> > > > > >>>>>>>>>> it to make this easier).
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> It's also clear that even if we tie the cache to the
> > > session,
> > > > we
> > > > > >>>>>>> still
> > > > > >>>>>>>>>> have to have limits on the number of caches we're
> willing
> > to
> > > > > >>>>> create.
> > > > > >>>>>>>>>> And probably we should reserve some cache slots for each
> > > > > >>>>> follower, so
> > > > > >>>>>>>>>> that clients don't take all of them.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> Id rather see a protocol in which the client is hinting
> > the
> > > > > >>>>> broker
> > > > > >>>>>>>>> that,
> > > > > >>>>>>>>>>> he is going to use the feature instead of a client
> > > > > >>>>>>>>>>> realizing that the broker just offered the feature
> > > > (regardless
> > > > > >>>>> of
> > > > > >>>>>>>>>>> protocol version which should only indicate that the
> > > feature
> > > > > >>>>>>>>>>> would be usable).
> > > > > >>>>>>>>>> Hmm.  I'm not sure what you mean by "hinting."  I do
> think
> > > > that
> > > > > >>>>> the
> > > > > >>>>>>>>>> server should have the option of not accepting
> incremental
> > > > > >>>>> requests
> > > > > >>>>>>> from
> > > > > >>>>>>>>>> specific clients, in order to save memory space.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> This seems to work better with a per
> > > > > >>>>>>>>>>> connection/session attached Metadata than with a Map
> and
> > > > could
> > > > > >>>>>>> allow
> > > > > >>>>>>>>> for
> > > > > >>>>>>>>>>> easier client implementations.
> > > > > >>>>>>>>>>> It would also make Client-side code easier as there
> > > wouldn't
> > > > > >>>>> be any
> > > > > >>>>>>>>>>> Cache-miss error Messages to handle.
> > > > > >>>>>>>>>> It is nice not to have to handle cache-miss responses, I
> > > > agree.
> > > > > >>>>>>>>>> However, TCP sessions aren't exposed to most of our
> > > > client-side
> > > > > >>>>> code.
> > > > > >>>>>>>>>> For example, when the Producer creates a message and
> hands
> > > it
> > > > > >>>>> off to
> > > > > >>>>>>> the
> > > > > >>>>>>>>>> NetworkClient, the NC will transparently re-connect and
> > > > re-send a
> > > > > >>>>>>>>>> message if the first send failed.  The higher-level code
> > > will
> > > > > >>>>> not be
> > > > > >>>>>>>>>> informed about whether the TCP session was
> re-established,
> > > > > >>>>> whether an
> > > > > >>>>>>>>>> existing TCP session was used, and so on.  So overall I
> > > would
> > > > > >>>>> still
> > > > > >>>>>>> lean
> > > > > >>>>>>>>>> towards not coupling this to the TCP session...
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> best,
> > > > > >>>>>>>>>> Colin
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>>     Thank you again for the KIP. And again, if this was
> > > > clarified
> > > > > >>>>>>> already
> > > > > >>>>>>>>>>> please drop me a hint where I could read about it.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Best Jan
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
> > > > > >>>>>>>>>>>> Hi all,
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> I created a KIP to improve the scalability and latency
> > of
> > > > > >>>>>>>>> FetchRequest:
> > > > > >>>>>>>>>>>> https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-
> > > > > >>>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > > > > >>>>>>>>> Partition+Scalability
> > > > > >>>>>>>>>>>> Please take a look.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> cheers,
> > > > > >>>>>>>>>>>> Colin
> > > > >
> > > >
> > >
> >
>

Reply via email to