On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote:
> On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote:
> > Hi,
> > 
> > sorry for the late reply, busy times :-/
> > 
> > I would ask you one thing maybe. Since the timeout
> > argument seems to be settled I have no further argument
> > form your side except the "i don't want to".
> > 
> > Can you see that connection.max.idle.max is the exact time
> > that expresses "We expect the client to be away for this long,
> > and come back and continue"?
> 
> Hi Jan,
> 
> Sure, connection.max.idle.max is the exact time that we want to keep
> around a TCP session.  TCP sessions are relatively cheap, so we can
> afford to keep them around for 10 minutes by default.  Incremental fetch
> state is less cheap, so we want to set a shorter timeout for it.  We
> also want new TCP sessions to be able to reuse an existing incremental
> fetch session rather than creating a new one and waiting for the old one
> to time out.
> 
> > 
> > also clarified some stuff inline
> > 
> > Best Jan
> > 
> > 
> > 
> > 
> > On 05.12.2017 23:14, Colin McCabe wrote:
> > > On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > >> Hi Colin
> > >>
> > >> Addressing the topic of how to manage slots from the other thread.
> > >> With tcp connections all this comes for free essentially.
> > > Hi Jan,
> > >
> > > I don't think that it's accurate to say that cache management "comes for
> > > free" by coupling the incremental fetch session with the TCP session.
> > > When a new TCP session is started by a fetch request, you still have to
> > > decide whether to grant that request an incremental fetch session or
> > > not.  If your answer is that you always grant the request, I would argue
> > > that you do not have cache management.
> > First I would say, the client has a big say in this. If the client
> > is not going to issue incremental he shouldn't ask for a cache
> > when the client ask for the cache we still have all options to deny.
> 
> To put it simply, we have to have some cache management above and beyond
> just giving out an incremental fetch session to anyone who has a TCP
> session.  Therefore, caching does not become simpler if you couple the
> fetch session to the TCP session.
> 
> > 
> > >
> > > I guess you could argue that timeouts are cache management, but I don't
> > > find that argument persuasive.  Anyone could just create a lot of TCP
> > > sessions and use a lot of resources, in that case.  So there is
> > > essentially no limit on memory use.  In any case, TCP sessions don't
> > > help us implement fetch session timeouts.
> > We still have all the options denying the request to keep the state.
> > What you want seems like a max connections / ip safeguard.
> > I can currently take down a broker with to many connections easily.
> > 
> > 
> > >> I still would argue we disable it by default and make a flag in the
> > >> broker to ask the leader to maintain the cache while replicating and 
> > >> also only
> > >> have it optional in consumers (default to off) so one can turn it on
> > >> where it really hurts.  MirrorMaker and audit consumers prominently.
> > > I agree with Jason's point from earlier in the thread.  Adding extra
> > > configuration knobs that aren't really necessary can harm usability.
> > > Certainly asking people to manually turn on a feature "where it really
> > > hurts" seems to fall in that category, when we could easily enable it
> > > automatically for them.
> > This doesn't make much sense to me.
> 
> There are no tradeoffs to think about from the client's point of view:
> it always wants an incremental fetch session.  So there is no benefit to
> making the clients configure an extra setting.  Updating and managing
> client configurations is also more difficult than managing broker
> configurations for most users.
> 
> > You also wanted to implement
> > a "turn of in case of bug"-knob. Having the client indicate if the
> > feauture will be used seems reasonable to me.,
> 
> True.  However, if there is a bug, we could also roll back the client,
> so having this configuration knob is not strictly required.
> 
> > >
> > >> Otherwise I left a few remarks in-line, which should help to understand
> > >> my view of the situation better
> > >>
> > >> Best Jan
> > >>
> > >>
> > >> On 05.12.2017 08:06, Colin McCabe wrote:
> > >>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > >>>> On 03.12.2017 21:55, Colin McCabe wrote:
> > >>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > >>>>>> Thanks for the explanation, Colin. A few more questions.
> > >>>>>>
> > >>>>>>> The session epoch is not complex.  It's just a number which 
> > >>>>>>> increments
> > >>>>>>> on each incremental fetch.  The session epoch is also useful for
> > >>>>>>> debugging-- it allows you to match up requests and responses when
> > >>>>>>> looking at log files.
> > >>>>>> Currently each request in Kafka has a correlation id to help match 
> > >>>>>> the
> > >>>>>> requests and responses. Is epoch doing something differently?
> > >>>>> Hi Becket,
> > >>>>>
> > >>>>> The correlation ID is used within a single TCP session, to uniquely
> > >>>>> associate a request with a response.  The correlation ID is not unique
> > >>>>> (and has no meaning) outside the context of that single TCP session.
> > >>>>>
> > >>>>> Keep in mind, NetworkClient is in charge of TCP sessions, and 
> > >>>>> generally
> > >>>>> tries to hide that information from the upper layers of the code.  So
> > >>>>> when you submit a request to NetworkClient, you don't know if that
> > >>>>> request creates a TCP session, or reuses an existing one.
> > >>>>>>> Unfortunately, this doesn't work.  Imagine the client misses an
> > >>>>>>> increment fetch response about a partition.  And then the partition 
> > >>>>>>> is
> > >>>>>>> never updated after that.  The client has no way to know about the
> > >>>>>>> partition, since it won't be included in any future incremental 
> > >>>>>>> fetch
> > >>>>>>> responses.  And there are no offsets to compare, since the 
> > >>>>>>> partition is
> > >>>>>>> simply omitted from the response.
> > >>>>>> I am curious about in which situation would the follower miss a 
> > >>>>>> response
> > >>>>>> of a partition. If the entire FetchResponse is lost (e.g. timeout), 
> > >>>>>> the
> > >>>>>> follower would disconnect and retry. That will result in sending a 
> > >>>>>> full
> > >>>>>> FetchRequest.
> > >>>>> Basically, you are proposing that we rely on TCP for reliable delivery
> > >>>>> in a distributed system.  That isn't a good idea for a bunch of
> > >>>>> different reasons.  First of all, TCP timeouts tend to be very long.  
> > >>>>> So
> > >>>>> if the TCP session timing out is your error detection mechanism, you
> > >>>>> have to wait minutes for messages to timeout.  Of course, we add a
> > >>>>> timeout on top of that after which we declare the connection bad and
> > >>>>> manually close it.  But just because the session is closed on one end
> > >>>>> doesn't mean that the other end knows that it is closed.  So the 
> > >>>>> leader
> > >>>>> may have to wait quite a long time before TCP decides that yes,
> > >>>>> connection X from the follower is dead and not coming back, even 
> > >>>>> though
> > >>>>> gremlins ate the FIN packet which the follower attempted to translate.
> > >>>>> If the cache state is tied to that TCP session, we have to keep that
> > >>>>> cache around for a much longer time than we should.
> > >>>> Hi,
> > >>>>
> > >>>> I see this from a different perspective. The cache expiry time
> > >>>> has the same semantic as idle connection time in this scenario.
> > >>>> It is the time range we expect the client to come back an reuse
> > >>>> its broker side state. I would argue that on close we would get an
> > >>>> extra shot at cleaning up the session state early. As opposed to
> > >>>> always wait for that duration for expiry to happen.
> > >>> Hi Jan,
> > >>>
> > >>> The idea here is that the incremental fetch cache expiry time can be
> > >>> much shorter than the TCP session timeout.  In general the TCP session
> > >>> timeout is common to all TCP connections, and very long.  To make these
> > >>> numbers a little more concrete, the TCP session timeout is often
> > >>> configured to be 2 hours on Linux.  (See
> > >>> https://www.cyberciti.biz/tips/linux-increasing-or-decreasing-tcp-sockets-timeouts.html
> > >>> )  The timeout I was proposing for incremental fetch sessions was one or
> > >>> two minutes at most.
> > >> Currently this is taken care of by
> > >> connections.max.idle.ms on the broker and defaults to something of few
> > >> minutes.
> > > It is 10 minutes by default, which is longer than what we want the
> > > incremental fetch session timeout to be.  There's no reason to couple
> > > these two things.
> > >
> > >> Also something we could let the client change if we really wanted to.
> > >> So there is no need to worry about coupling our implementation to some
> > >> timeouts given by the OS, with TCP one always has full control over the 
> > >> worst
> > >> times + one gets the extra shot cleaning up early when the close comes 
> > >> through.
> > >> Which is the majority of the cases.
> > > In the majority of cases, the TCP session will be re-established.  In
> > > that case, we have to send a full fetch request rather than an
> > > incremental fetch request.
> > I actually have a hard time believing this. Do you have any numbers of
> > any existing production system? Is it the virtualisation layer cutting 
> > all the connections?
> > We see this only on application crashes and restarts where the app needs 
> > todo the full anyways
> > as it probably continues with stores offsets.
> 
> Yes, TCP connections get dropped.  It happens very often in production
> clusters, actually.  When I was working on Hadoop, one of the most
> common questions I heard from newcomers was "why do I see so many
> EOFException messages in the logs"?  The other thing that happens a lot
> is DNS outages or slowness.  Public clouds seem to have even more
> unstable networks than the on-premise clusters.  I am not sure why that
> is.
> 
> > >
> > >>>>> Secondly, from a software engineering perspective, it's not a good 
> > >>>>> idea
> > >>>>> to try to tightly tie together TCP and our code.  We would have to
> > >>>>> rework how we interact with NetworkClient so that we are aware of 
> > >>>>> things
> > >>>>> like TCP sessions closing or opening.  We would have to be careful
> > >>>>> preserve the ordering of incoming messages when doing things like
> > >>>>> putting incoming requests on to a queue to be processed by multiple
> > >>>>> threads.  It's just a lot of complexity to add, and there's no upside.
> > >>>> I see the point here. And I had a small chat with Dong Lin already
> > >>>> making me aware of this. I tried out the approaches and propose the
> > >>>> following:
> > >>>>
> > >>>> The client start and does a full fetch. It then does incremental 
> > >>>> fetches.
> > >>>> The connection to the broker dies and is re-established by 
> > >>>> NetworkClient
> > >>>> under the hood.
> > >>>> The broker sees an incremental fetch without having state => returns
> > >>>> error:
> > >>>> Client sees the error, does a full fetch and goes back to incrementally
> > >>>> fetching.
> > >>>>
> > >>>> having this 1 additional error round trip is essentially the same as
> > >>>> when something
> > >>>> with the sessions or epoch changed unexpectedly to the client (say
> > >>>> expiry).
> > >>>>
> > >>>> So its nothing extra added but the conditions are easier to evaluate.
> > >>>> Especially since we do everything with NetworkClient. Other 
> > >>>> implementers
> > >>>> on the
> > >>>> protocol are free to optimizes this and do not do the errornours
> > >>>> roundtrip on the
> > >>>> new connection.
> > >>>> Its a great plus that the client can know when the error is gonna
> > >>>> happen. instead of
> > >>>> the server to always have to report back if something changes
> > >>>> unexpectedly for the client
> > >>> You are assuming that the leader and the follower agree that the TCP
> > >>> session drops at the same time.  When there are network problems, this
> > >>> may not be true.  The leader may still think the previous TCP session is
> > >>> active.  In that case, we have to keep the incremental fetch session
> > >>> state around until we learn otherwise (which could be up to that 2 hour
> > >>> timeout I mentioned).  And if we get a new incoming incremental fetch
> > >>> request, we can't assume that it replaces the previous one, because the
> > >>> IDs will be different (the new one starts a new session).
> > >> As mentioned, no reason to fear some time-outs out of our control
> > >>>>> Imagine that I made an argument that client IDs are "complex" and 
> > >>>>> should
> > >>>>> be removed from our APIs.  After all, we can just look at the remote 
> > >>>>> IP
> > >>>>> address and TCP port of each connection.  Would you think that was a
> > >>>>> good idea?  The client ID is useful when looking at logs.  For 
> > >>>>> example,
> > >>>>> if a rebalance is having problems, you want to know what clients were
> > >>>>> having a problem.  So having the client ID field to guide you is
> > >>>>> actually much less "complex" in practice than not having an ID.
> > >>>> I still cant follow why the correlation idea will not help here.
> > >>>> Correlating logs with it usually works great. Even with primitive tools
> > >>>> like grep
> > >>> The correlation ID does help somewhat, but certainly not as much as a
> > >>> unique 64-bit ID.  The correlation ID is not unique in the broker, just
> > >>> unique to a single NetworkClient.  Simiarly, the correlation ID is not
> > >>> unique on the client side, if there are multiple Consumers, etc.
> > >> Can always bump entropy in correlation IDs, never had a problem
> > >> of finding to many duplicates. Would be a different KIP though.
> > >>>>> Similarly, if metadata responses had epoch numbers (simple 
> > >>>>> incrementing
> > >>>>> numbers), we would not have to debug problems like clients 
> > >>>>> accidentally
> > >>>>> getting old metadata from servers that had been partitioned off from 
> > >>>>> the
> > >>>>> network for a while.  Clients would know the difference between old 
> > >>>>> and
> > >>>>> new metadata.  So putting epochs in to the metadata request is much 
> > >>>>> less
> > >>>>> "complex" operationally, even though it's an extra field in the 
> > >>>>> request.
> > >>>>>     This has been discussed before on the mailing list.
> > >>>>>
> > >>>>> So I think the bottom line for me is that having the session ID and
> > >>>>> session epoch, while it adds two extra fields, reduces operational
> > >>>>> complexity and increases debuggability.  It avoids tightly coupling us
> > >>>>> to assumptions about reliable ordered delivery which tend to be 
> > >>>>> violated
> > >>>>> in practice in multiple layers of the stack.  Finally, it  avoids the
> > >>>>> necessity of refactoring NetworkClient.
> > >>>> So there is stacks out there that violate TCP guarantees? And software
> > >>>> still works? How can this be? Can you elaborate a little where this
> > >>>> can be violated? I am not very familiar with virtualized environments
> > >>>> but they can't really violate TCP contracts.
> > >>> TCP's guarantees of reliable, in-order transmission certainly can be
> > >>> violated.  For example, I once had to debug a cluster where a certain
> > >>> node had a network card which corrupted its transmissions occasionally.
> > >>> With all the layers of checksums, you would think that this was not
> > >>> possible, but it happened.  We occasionally got corrupted data written
> > >>> to disk on the other end because of it.  Even more frustrating, the data
> > >>> was not corrupted on disk on the sending node-- it was a bug in the
> > >>> network card driver that was injecting the errors.
> > >> true, but your broker might aswell read a corrupted 600GB as size from
> > >> the network and die with OOM instantly.
> > > If you read 600 GB as the size from the network, you will not "die with
> > > OOM instantly."  That would be a bug.  Instead, you will notice that 600
> > > GB is greater than max.message.bytes, and close the connection.
> > We only check max.message.bytes to late to guard against consumer
> > stalling.
> > we dont have a notion of max.networkpacket.size before we allocate the 
> > bytebuffer to read it into.
> 
> "network packets" are not the same thing as "kafka RPCs."  One Kafka RPC
> could take up mutiple ethernet packets.
> 
> Also, max.message.bytes has nothing to do with "consumer stalling" --
> you are probably thinking about some of the fetch request
> configurations.  max.message.bytes is used by the RPC system to figure
> out whether to read the full incoming RP

Whoops, this is incorrect.  I was thinking about
"socket.request.max.bytes" rather than "max.message.bytes."  Sorry about
that.  See Ismael's email as well.

best,
Colin

> 
> best,
> Colin
> 
> > 
> > >
> > >> Optimizing for still having functional
> > >> software under this circumstances is not reasonable.
> > >> You want to get rid of such a
> > >> node ASAP and pray that zookeepers ticks get corrupted often enough
> > >> that it finally drops out of the cluster.
> > >>
> > >> There is a good reason that these kinda things
> > >> https://issues.apache.org/jira/browse/MESOS-4105
> > >> don't end up as kafka Jiras. In the end you can't run any software in
> > >> these containers anymore. Application layer checksums are a neat thing to
> > >> fail fast but trying to cope with this probably causes more bad than
> > >> good.  So I would argue that we shouldn't try this for the fetch 
> > >> requests.
> > > One of the goals of Apache Kafka is to be "a streaming platform...
> > > [that] lets you store streams of records in a fault-tolerant way."  For
> > > more information, see https://kafka.apache.org/intro .  Fault-tolerance
> > > is explicitly part of the goal of Kafka.  Prayer should be optional, not
> > > required, when running the software.
> > Yes, we need to fail ASAP when we read corrupted packages. It seemed
> > to me like you tried to make the case for pray and try to stay alive.
> > Fault
> > tolerance here means. I am a fishy box i am going to let a good box
> > handle
> > it and be silent until i get fixed up.
> > >
> > > Crashing because someone sent you a bad packet is not reasonable
> > > behavior.  It is a bug.  Similarly, bringing down the whole cluster,
> > > which could a hundred nodes, because someone had a bad network adapter
> > > is not reasonable behavior.  It is perhaps reasonable for the cluster to
> > > perform worse when hardware is having problems.  But that's a different
> > > discussion.
> > See above.
> > >
> > > best,
> > > Colin
> > >
> > >>
> > >>> However, my point was not about TCP's guarantees being violated.  My
> > >>> point is that TCP's guarantees are only one small building block to
> > >>> build a robust distributed system.  TCP basically just says that if you
> > >>> get any bytes from the stream, you will get the ones that were sent by
> > >>> the sender, in the order they were sent.  TCP does not guarantee that
> > >>> the bytes you send will get there.  It does not guarantee that if you
> > >>> close the connection, the other end will know about it in a timely
> > >>> fashion.
> > >> These are very powerful grantees and since we use TCP we should
> > >> piggy pack everything that is reasonable on to it. IMO there is no
> > >> need to reimplement correct sequencing again if you get that from
> > >> your transport layer. It saves you the complexity, it makes
> > >> you application behave way more naturally and your api easier to
> > >> understand.
> > >>
> > >> There is literally nothing the Kernel wont let you decide
> > >> especially not any timings. Only noticeable exception being TIME_WAIT
> > >> of usually 240 seconds but that already has little todo with the broker
> > >> itself and
> > >> if we are running out of usable ports because of this then expiring
> > >> fetch requests
> > >> wont help much anyways.
> > >>
> > >> I hope I could strengthen the trust you have in userland TCP connection
> > >> management. It is really powerful and can be exploited for maximum gains
> > >> without much risk in my opinion.
> > >>
> > >>
> > >>
> > >>> It does not guarantee that the bytes will be received in a
> > >>> certain timeframe, and certainly doesn't guarantee that if you send a
> > >>> byte on connection X and then on connection Y, that the remote end will
> > >>> read a byte on X before reading a byte on Y.
> > >> Noone expects this from two independent paths of any kind.
> > >>
> > >>> best,
> > >>> Colin
> > >>>
> > >>>> Hope this made my view clearer, especially the first part.
> > >>>>
> > >>>> Best Jan
> > >>>>
> > >>>>
> > >>>>> best,
> > >>>>> Colin
> > >>>>>
> > >>>>>
> > >>>>>> If there is an error such as NotLeaderForPartition is
> > >>>>>> returned for some partitions, the follower can always send a full
> > >>>>>> FetchRequest. Is there a scenario that only some of the partitions 
> > >>>>>> in a
> > >>>>>> FetchResponse is lost?
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>>
> > >>>>>> Jiangjie (Becket) Qin
> > >>>>>>
> > >>>>>>
> > >>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<cmcc...@apache.org>  
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > >>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<cmcc...@apache.org>
> > >>>>>>> wrote:
> > >>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > >>>>>>>>>> Hey Colin,
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks much for the update. I have a few questions below:
> > >>>>>>>>>>
> > >>>>>>>>>> 1. I am not very sure that we need Fetch Session Epoch. It seems 
> > >>>>>>>>>> that
> > >>>>>>>>>> Fetch
> > >>>>>>>>>> Session Epoch is only needed to help leader distinguish between 
> > >>>>>>>>>> "a
> > >>>>>>> full
> > >>>>>>>>>> fetch request" and "a full fetch request and request a new
> > >>>>>>> incremental
> > >>>>>>>>>> fetch session". Alternatively, follower can also indicate "a full
> > >>>>>>> fetch
> > >>>>>>>>>> request and request a new incremental fetch session" by setting 
> > >>>>>>>>>> Fetch
> > >>>>>>>>>> Session ID to -1 without using Fetch Session Epoch. Does this 
> > >>>>>>>>>> make
> > >>>>>>> sense?
> > >>>>>>>>> Hi Dong,
> > >>>>>>>>>
> > >>>>>>>>> The fetch session epoch is very important for ensuring 
> > >>>>>>>>> correctness.  It
> > >>>>>>>>> prevents corrupted or incomplete fetch data due to network 
> > >>>>>>>>> reordering
> > >>>>>>> or
> > >>>>>>>>> loss.
> > >>>>>>>>>
> > >>>>>>>>> For example, consider a scenario where the follower sends a fetch
> > >>>>>>>>> request to the leader.  The leader responds, but the response is 
> > >>>>>>>>> lost
> > >>>>>>>>> because of network problems which affected the TCP session.  In 
> > >>>>>>>>> that
> > >>>>>>>>> case, the follower must establish a new TCP session and re-send 
> > >>>>>>>>> the
> > >>>>>>>>> incremental fetch request.  But the leader does not know that the
> > >>>>>>>>> follower didn't receive the previous incremental fetch response.  
> > >>>>>>>>> It is
> > >>>>>>>>> only the incremental fetch epoch which lets the leader know that 
> > >>>>>>>>> it
> > >>>>>>>>> needs to resend that data, and not data which comes afterwards.
> > >>>>>>>>>
> > >>>>>>>>> You could construct similar scenarios with message reordering,
> > >>>>>>>>> duplication, etc.  Basically, this is a stateful protocol on an
> > >>>>>>>>> unreliable network, and you need to know whether the follower got 
> > >>>>>>>>> the
> > >>>>>>>>> previous data you sent before you move on.  And you need to handle
> > >>>>>>>>> issues like duplicated or delayed requests.  These issues do not 
> > >>>>>>>>> affect
> > >>>>>>>>> the full fetch request, because it is not stateful-- any full 
> > >>>>>>>>> fetch
> > >>>>>>>>> request can be understood and properly responded to in isolation.
> > >>>>>>>>>
> > >>>>>>>> Thanks for the explanation. This makes sense. On the other hand I 
> > >>>>>>>> would
> > >>>>>>>> be interested in learning more about whether Becket's solution can 
> > >>>>>>>> help
> > >>>>>>>> simplify the protocol by not having the echo field and whether 
> > >>>>>>>> that is
> > >>>>>>>> worth doing.
> > >>>>>>> Hi Dong,
> > >>>>>>>
> > >>>>>>> I commented about this in the other thread.  A solution which 
> > >>>>>>> doesn't
> > >>>>>>> maintain session information doesn't work here.
> > >>>>>>>
> > >>>>>>>>>> 2. It is said that Incremental FetchRequest will include 
> > >>>>>>>>>> partitions
> > >>>>>>> whose
> > >>>>>>>>>> fetch offset or maximum number of fetch bytes has been changed. 
> > >>>>>>>>>> If
> > >>>>>>>>>> follower's logStartOffet of a partition has changed, should this
> > >>>>>>>>>> partition also be included in the next FetchRequest to the 
> > >>>>>>>>>> leader?
> > >>>>>>>>> Otherwise, it
> > >>>>>>>>>> may affect the handling of DeleteRecordsRequest because leader 
> > >>>>>>>>>> may
> > >>>>>>> not
> > >>>>>>>>> know
> > >>>>>>>>>> the corresponding data has been deleted on the follower.
> > >>>>>>>>> Yeah, the follower should include the partition if the 
> > >>>>>>>>> logStartOffset
> > >>>>>>>>> has changed.  That should be spelled out on the KIP.  Fixed.
> > >>>>>>>>>
> > >>>>>>>>>> 3. In the section "Per-Partition Data", a partition is not 
> > >>>>>>>>>> considered
> > >>>>>>>>>> dirty if its log start offset has changed. Later in the section
> > >>>>>>>>> "FetchRequest
> > >>>>>>>>>> Changes", it is said that incremental fetch responses will 
> > >>>>>>>>>> include a
> > >>>>>>>>>> partition if its logStartOffset has changed. It seems 
> > >>>>>>>>>> inconsistent.
> > >>>>>>> Can
> > >>>>>>>>>> you update the KIP to clarify it?
> > >>>>>>>>>>
> > >>>>>>>>> In the "Per-Partition Data" section, it does say that 
> > >>>>>>>>> logStartOffset
> > >>>>>>>>> changes make a partition dirty, though, right?  The first bullet 
> > >>>>>>>>> point
> > >>>>>>>>> is:
> > >>>>>>>>>
> > >>>>>>>>>> * The LogCleaner deletes messages, and this changes the log start
> > >>>>>>> offset
> > >>>>>>>>> of the partition on the leader., or
> > >>>>>>>>>
> > >>>>>>>> Ah I see. I think I didn't notice this because statement assumes 
> > >>>>>>>> that the
> > >>>>>>>> LogStartOffset in the leader only changes due to LogCleaner. In 
> > >>>>>>>> fact the
> > >>>>>>>> LogStartOffset can change on the leader due to either log 
> > >>>>>>>> retention and
> > >>>>>>>> DeleteRecordsRequest. I haven't verified whether LogCleaner can 
> > >>>>>>>> change
> > >>>>>>>> LogStartOffset though. It may be a bit better to just say that a
> > >>>>>>>> partition is considered dirty if LogStartOffset changes.
> > >>>>>>> I agree.  It should be straightforward to just resend the partition 
> > >>>>>>> if
> > >>>>>>> logStartOffset changes.
> > >>>>>>>
> > >>>>>>>>>> 4. In "Fetch Session Caching" section, it is said that each 
> > >>>>>>>>>> broker
> > >>>>>>> has a
> > >>>>>>>>>> limited number of slots. How is this number determined? Does this
> > >>>>>>> require
> > >>>>>>>>>> a new broker config for this number?
> > >>>>>>>>> Good point.  I added two broker configuration parameters to 
> > >>>>>>>>> control
> > >>>>>>> this
> > >>>>>>>>> number.
> > >>>>>>>>>
> > >>>>>>>> I am curious to see whether we can avoid some of these new 
> > >>>>>>>> configs. For
> > >>>>>>>> example, incremental.fetch.session.cache.slots.per.broker is 
> > >>>>>>>> probably
> > >>>>>>> not
> > >>>>>>>> necessary because if a leader knows that a FetchRequest comes from 
> > >>>>>>>> a
> > >>>>>>>> follower, we probably want the leader to always cache the 
> > >>>>>>>> information
> > >>>>>>>> from that follower. Does this make sense?
> > >>>>>>> Yeah, maybe we can avoid having
> > >>>>>>> incremental.fetch.session.cache.slots.per.broker.
> > >>>>>>>
> > >>>>>>>> Maybe we can discuss the config later after there is agreement on 
> > >>>>>>>> how the
> > >>>>>>>> protocol would look like.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>>> What is the error code if broker does
> > >>>>>>>>>> not have new log for the incoming FetchRequest?
> > >>>>>>>>> Hmm, is there a typo in this question?  Maybe you meant to ask 
> > >>>>>>>>> what
> > >>>>>>>>> happens if there is no new cache slot for the incoming 
> > >>>>>>>>> FetchRequest?
> > >>>>>>>>> That's not an error-- the incremental fetch session ID just gets 
> > >>>>>>>>> set to
> > >>>>>>>>> 0, indicating no incremental fetch session was created.
> > >>>>>>>>>
> > >>>>>>>> Yeah there is a typo. You have answered my question.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>>> 5. Can you clarify what happens if follower adds a partition to 
> > >>>>>>>>>> the
> > >>>>>>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does 
> > >>>>>>>>>> leader
> > >>>>>>>>>> needs to generate a new session for this ReplicaFetcherThread or
> > >>>>>>> does it
> > >>>>>>>>> re-use
> > >>>>>>>>>> the existing session?  If it uses a new session, is the old 
> > >>>>>>>>>> session
> > >>>>>>>>>> actively deleted from the slot?
> > >>>>>>>>> The basic idea is that you can't make changes, except by sending 
> > >>>>>>>>> a full
> > >>>>>>>>> fetch request.  However, perhaps we can allow the client to 
> > >>>>>>>>> re-use its
> > >>>>>>>>> existing session ID.  If the client sets sessionId = id, epoch = 
> > >>>>>>>>> 0, it
> > >>>>>>>>> could re-initialize the session.
> > >>>>>>>>>
> > >>>>>>>> Yeah I agree with the basic idea. We probably want to understand 
> > >>>>>>>> more
> > >>>>>>>> detail about how this works later.
> > >>>>>>> Sounds good.  I updated the KIP with this information.  A
> > >>>>>>> re-initialization should be exactly the same as an initialization,
> > >>>>>>> except that it reuses an existing ID.
> > >>>>>>>
> > >>>>>>> best,
> > >>>>>>> Colin
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>>>> BTW, I think it may be useful if the KIP can include the example
> > >>>>>>> workflow
> > >>>>>>>>>> of how this feature will be used in case of partition change and 
> > >>>>>>>>>> so
> > >>>>>>> on.
> > >>>>>>>>> Yeah, that might help.
> > >>>>>>>>>
> > >>>>>>>>> best,
> > >>>>>>>>> Colin
> > >>>>>>>>>
> > >>>>>>>>>> Thanks,
> > >>>>>>>>>> Dong
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin 
> > >>>>>>>>>> McCabe<cmcc...@apache.org>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> I updated the KIP with the ideas we've been discussing.
> > >>>>>>>>>>>
> > >>>>>>>>>>> best,
> > >>>>>>>>>>> Colin
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
> > >>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
> > >>>>>>>>>>>>> Hi Colin, thank you  for this KIP, it can become a really
> > >>>>>>> useful
> > >>>>>>>>> thing.
> > >>>>>>>>>>>>> I just scanned through the discussion so far and wanted to
> > >>>>>>> start a
> > >>>>>>>>>>>>> thread to make as decision about keeping the
> > >>>>>>>>>>>>> cache with the Connection / Session or having some sort of 
> > >>>>>>>>>>>>> UUID
> > >>>>>>>>> indN
> > >>>>>>>>>>> exed
> > >>>>>>>>>>>>> global Map.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Sorry if that has been settled already and I missed it. In 
> > >>>>>>>>>>>>> this
> > >>>>>>>>> case
> > >>>>>>>>>>>>> could anyone point me to the discussion?
> > >>>>>>>>>>>> Hi Jan,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I don't think anyone has discussed the idea of tying the cache
> > >>>>>>> to an
> > >>>>>>>>>>>> individual TCP session yet.  I agree that since the cache is
> > >>>>>>>>> intended to
> > >>>>>>>>>>>> be used only by a single follower or client, it's an 
> > >>>>>>>>>>>> interesting
> > >>>>>>>>> thing
> > >>>>>>>>>>>> to think about.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I guess the obvious disadvantage is that whenever your TCP
> > >>>>>>> session
> > >>>>>>>>>>>> drops, you have to make a full fetch request rather than an
> > >>>>>>>>> incremental
> > >>>>>>>>>>>> one.  It's not clear to me how often this happens in practice 
> > >>>>>>>>>>>> --
> > >>>>>>> it
> > >>>>>>>>>>>> probably depends a lot on the quality of the network.  From a
> > >>>>>>> code
> > >>>>>>>>>>>> perspective, it might also be a bit difficult to access data
> > >>>>>>>>> associated
> > >>>>>>>>>>>> with the Session from classes like KafkaApis (although we could
> > >>>>>>>>> refactor
> > >>>>>>>>>>>> it to make this easier).
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> It's also clear that even if we tie the cache to the session, 
> > >>>>>>>>>>>> we
> > >>>>>>>>> still
> > >>>>>>>>>>>> have to have limits on the number of caches we're willing to
> > >>>>>>> create.
> > >>>>>>>>>>>> And probably we should reserve some cache slots for each
> > >>>>>>> follower, so
> > >>>>>>>>>>>> that clients don't take all of them.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Id rather see a protocol in which the client is hinting the
> > >>>>>>> broker
> > >>>>>>>>>>> that,
> > >>>>>>>>>>>>> he is going to use the feature instead of a client
> > >>>>>>>>>>>>> realizing that the broker just offered the feature (regardless
> > >>>>>>> of
> > >>>>>>>>>>>>> protocol version which should only indicate that the feature
> > >>>>>>>>>>>>> would be usable).
> > >>>>>>>>>>>> Hmm.  I'm not sure what you mean by "hinting."  I do think that
> > >>>>>>> the
> > >>>>>>>>>>>> server should have the option of not accepting incremental
> > >>>>>>> requests
> > >>>>>>>>> from
> > >>>>>>>>>>>> specific clients, in order to save memory space.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> This seems to work better with a per
> > >>>>>>>>>>>>> connection/session attached Metadata than with a Map and could
> > >>>>>>>>> allow
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>> easier client implementations.
> > >>>>>>>>>>>>> It would also make Client-side code easier as there wouldn't
> > >>>>>>> be any
> > >>>>>>>>>>>>> Cache-miss error Messages to handle.
> > >>>>>>>>>>>> It is nice not to have to handle cache-miss responses, I agree.
> > >>>>>>>>>>>> However, TCP sessions aren't exposed to most of our client-side
> > >>>>>>> code.
> > >>>>>>>>>>>> For example, when the Producer creates a message and hands it
> > >>>>>>> off to
> > >>>>>>>>> the
> > >>>>>>>>>>>> NetworkClient, the NC will transparently re-connect and 
> > >>>>>>>>>>>> re-send a
> > >>>>>>>>>>>> message if the first send failed.  The higher-level code will
> > >>>>>>> not be
> > >>>>>>>>>>>> informed about whether the TCP session was re-established,
> > >>>>>>> whether an
> > >>>>>>>>>>>> existing TCP session was used, and so on.  So overall I would
> > >>>>>>> still
> > >>>>>>>>> lean
> > >>>>>>>>>>>> towards not coupling this to the TCP session...
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> best,
> > >>>>>>>>>>>> Colin
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>>      Thank you again for the KIP. And again, if this was 
> > >>>>>>>>>>>>> clarified
> > >>>>>>>>> already
> > >>>>>>>>>>>>> please drop me a hint where I could read about it.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Best Jan
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
> > >>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I created a KIP to improve the scalability and latency of
> > >>>>>>>>>>> FetchRequest:
> > >>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > >>>>>>>>>>> Partition+Scalability
> > >>>>>>>>>>>>>> Please take a look.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> cheers,
> > >>>>>>>>>>>>>> Colin
> >

Reply via email to