>>
>> The point I want to make is that avoiding doing binary search on index
>> file and avoid reading the log segments during fetch has some additional
>> benefits. So if the solution works for the current KIP, it might be a
>> better choice.

>Let's discuss this in a follow-on KIP.

If the discussion will potentially change the protocol in the current
proposal. Would it be better to discuss it now instead of in a follow-up
KIP so we don't have some protocol that immediately requires a change.


On Tue, Dec 19, 2017 at 9:26 AM, Colin McCabe <co...@cmccabe.xyz> wrote:

> On Tue, Dec 19, 2017, at 02:16, Jan Filipiak wrote:
> > Sorry for coming back at this so late.
> >
> >
> >
> > On 11.12.2017 07:12, Colin McCabe wrote:
> > > On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote:
> > >> On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote:
> > >>> Hi,
> > >>>
> > >>> sorry for the late reply, busy times :-/
> > >>>
> > >>> I would ask you one thing maybe. Since the timeout
> > >>> argument seems to be settled I have no further argument
> > >>> form your side except the "i don't want to".
> > >>>
> > >>> Can you see that connection.max.idle.max is the exact time
> > >>> that expresses "We expect the client to be away for this long,
> > >>> and come back and continue"?
> > >> Hi Jan,
> > >>
> > >> Sure, connection.max.idle.max is the exact time that we want to keep
> > >> around a TCP session.  TCP sessions are relatively cheap, so we can
> > >> afford to keep them around for 10 minutes by default.  Incremental
> fetch
> > >> state is less cheap, so we want to set a shorter timeout for it.  We
> > >> also want new TCP sessions to be able to reuse an existing incremental
> > >> fetch session rather than creating a new one and waiting for the old
> one
> > >> to time out.
> > >>
> > >>> also clarified some stuff inline
> > >>>
> > >>> Best Jan
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On 05.12.2017 23:14, Colin McCabe wrote:
> > >>>> On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > >>>>> Hi Colin
> > >>>>>
> > >>>>> Addressing the topic of how to manage slots from the other thread.
> > >>>>> With tcp connections all this comes for free essentially.
> > >>>> Hi Jan,
> > >>>>
> > >>>> I don't think that it's accurate to say that cache management
> "comes for
> > >>>> free" by coupling the incremental fetch session with the TCP
> session.
> > >>>> When a new TCP session is started by a fetch request, you still
> have to
> > >>>> decide whether to grant that request an incremental fetch session or
> > >>>> not.  If your answer is that you always grant the request, I would
> argue
> > >>>> that you do not have cache management.
> > >>> First I would say, the client has a big say in this. If the client
> > >>> is not going to issue incremental he shouldn't ask for a cache
> > >>> when the client ask for the cache we still have all options to deny.
> > >> To put it simply, we have to have some cache management above and
> beyond
> > >> just giving out an incremental fetch session to anyone who has a TCP
> > >> session.  Therefore, caching does not become simpler if you couple the
> > >> fetch session to the TCP session.
> > Simply giving out an fetch session for everyone with a connection is too
> > simple,
> > but I think it plays well into the idea of consumers choosing to use the
> > feature
> > therefore only enabling where it brings maximum gains
> > (replicas,MirrorMakers)
> > >>
> > >>>> I guess you could argue that timeouts are cache management, but I
> don't
> > >>>> find that argument persuasive.  Anyone could just create a lot of
> TCP
> > >>>> sessions and use a lot of resources, in that case.  So there is
> > >>>> essentially no limit on memory use.  In any case, TCP sessions don't
> > >>>> help us implement fetch session timeouts.
> > >>> We still have all the options denying the request to keep the state.
> > >>> What you want seems like a max connections / ip safeguard.
> > >>> I can currently take down a broker with to many connections easily.
> > >>>
> > >>>
> > >>>>> I still would argue we disable it by default and make a flag in the
> > >>>>> broker to ask the leader to maintain the cache while replicating
> and also only
> > >>>>> have it optional in consumers (default to off) so one can turn it
> on
> > >>>>> where it really hurts.  MirrorMaker and audit consumers
> prominently.
> > >>>> I agree with Jason's point from earlier in the thread.  Adding extra
> > >>>> configuration knobs that aren't really necessary can harm usability.
> > >>>> Certainly asking people to manually turn on a feature "where it
> really
> > >>>> hurts" seems to fall in that category, when we could easily enable
> it
> > >>>> automatically for them.
> > >>> This doesn't make much sense to me.
> > >> There are no tradeoffs to think about from the client's point of view:
> > >> it always wants an incremental fetch session.  So there is no benefit
> to
> > >> making the clients configure an extra setting.  Updating and managing
> > >> client configurations is also more difficult than managing broker
> > >> configurations for most users.
> > >>
> > >>> You also wanted to implement
> > >>> a "turn of in case of bug"-knob. Having the client indicate if the
> > >>> feauture will be used seems reasonable to me.,
> > >> True.  However, if there is a bug, we could also roll back the client,
> > >> so having this configuration knob is not strictly required.
> > >>
> > >>>>> Otherwise I left a few remarks in-line, which should help to
> understand
> > >>>>> my view of the situation better
> > >>>>>
> > >>>>> Best Jan
> > >>>>>
> > >>>>>
> > >>>>> On 05.12.2017 08:06, Colin McCabe wrote:
> > >>>>>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > >>>>>>> On 03.12.2017 21:55, Colin McCabe wrote:
> > >>>>>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > >>>>>>>>> Thanks for the explanation, Colin. A few more questions.
> > >>>>>>>>>
> > >>>>>>>>>> The session epoch is not complex.  It's just a number which
> increments
> > >>>>>>>>>> on each incremental fetch.  The session epoch is also useful
> for
> > >>>>>>>>>> debugging-- it allows you to match up requests and responses
> when
> > >>>>>>>>>> looking at log files.
> > >>>>>>>>> Currently each request in Kafka has a correlation id to help
> match the
> > >>>>>>>>> requests and responses. Is epoch doing something differently?
> > >>>>>>>> Hi Becket,
> > >>>>>>>>
> > >>>>>>>> The correlation ID is used within a single TCP session, to
> uniquely
> > >>>>>>>> associate a request with a response.  The correlation ID is not
> unique
> > >>>>>>>> (and has no meaning) outside the context of that single TCP
> session.
> > >>>>>>>>
> > >>>>>>>> Keep in mind, NetworkClient is in charge of TCP sessions, and
> generally
> > >>>>>>>> tries to hide that information from the upper layers of the
> code.  So
> > >>>>>>>> when you submit a request to NetworkClient, you don't know if
> that
> > >>>>>>>> request creates a TCP session, or reuses an existing one.
> > >>>>>>>>>> Unfortunately, this doesn't work.  Imagine the client misses
> an
> > >>>>>>>>>> increment fetch response about a partition.  And then the
> partition is
> > >>>>>>>>>> never updated after that.  The client has no way to know
> about the
> > >>>>>>>>>> partition, since it won't be included in any future
> incremental fetch
> > >>>>>>>>>> responses.  And there are no offsets to compare, since the
> partition is
> > >>>>>>>>>> simply omitted from the response.
> > >>>>>>>>> I am curious about in which situation would the follower miss
> a response
> > >>>>>>>>> of a partition. If the entire FetchResponse is lost (e.g.
> timeout), the
> > >>>>>>>>> follower would disconnect and retry. That will result in
> sending a full
> > >>>>>>>>> FetchRequest.
> > >>>>>>>> Basically, you are proposing that we rely on TCP for reliable
> delivery
> > >>>>>>>> in a distributed system.  That isn't a good idea for a bunch of
> > >>>>>>>> different reasons.  First of all, TCP timeouts tend to be very
> long.  So
> > >>>>>>>> if the TCP session timing out is your error detection
> mechanism, you
> > >>>>>>>> have to wait minutes for messages to timeout.  Of course, we
> add a
> > >>>>>>>> timeout on top of that after which we declare the connection
> bad and
> > >>>>>>>> manually close it.  But just because the session is closed on
> one end
> > >>>>>>>> doesn't mean that the other end knows that it is closed.  So
> the leader
> > >>>>>>>> may have to wait quite a long time before TCP decides that yes,
> > >>>>>>>> connection X from the follower is dead and not coming back,
> even though
> > >>>>>>>> gremlins ate the FIN packet which the follower attempted to
> translate.
> > >>>>>>>> If the cache state is tied to that TCP session, we have to keep
> that
> > >>>>>>>> cache around for a much longer time than we should.
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> I see this from a different perspective. The cache expiry time
> > >>>>>>> has the same semantic as idle connection time in this scenario.
> > >>>>>>> It is the time range we expect the client to come back an reuse
> > >>>>>>> its broker side state. I would argue that on close we would get
> an
> > >>>>>>> extra shot at cleaning up the session state early. As opposed to
> > >>>>>>> always wait for that duration for expiry to happen.
> > >>>>>> Hi Jan,
> > >>>>>>
> > >>>>>> The idea here is that the incremental fetch cache expiry time can
> be
> > >>>>>> much shorter than the TCP session timeout.  In general the TCP
> session
> > >>>>>> timeout is common to all TCP connections, and very long.  To make
> these
> > >>>>>> numbers a little more concrete, the TCP session timeout is often
> > >>>>>> configured to be 2 hours on Linux.  (See
> > >>>>>> https://www.cyberciti.biz/tips/linux-increasing-or-
> decreasing-tcp-sockets-timeouts.html
> > >>>>>> )  The timeout I was proposing for incremental fetch sessions was
> one or
> > >>>>>> two minutes at most.
> > >>>>> Currently this is taken care of by
> > >>>>> connections.max.idle.ms on the broker and defaults to something
> of few
> > >>>>> minutes.
> > >>>> It is 10 minutes by default, which is longer than what we want the
> > >>>> incremental fetch session timeout to be.  There's no reason to
> couple
> > >>>> these two things.
> > >>>>
> > >>>>> Also something we could let the client change if we really wanted
> to.
> > >>>>> So there is no need to worry about coupling our implementation to
> some
> > >>>>> timeouts given by the OS, with TCP one always has full control
> over the worst
> > >>>>> times + one gets the extra shot cleaning up early when the close
> comes through.
> > >>>>> Which is the majority of the cases.
> > >>>> In the majority of cases, the TCP session will be re-established.
> In
> > >>>> that case, we have to send a full fetch request rather than an
> > >>>> incremental fetch request.
> > >>> I actually have a hard time believing this. Do you have any numbers
> of
> > >>> any existing production system? Is it the virtualisation layer
> cutting
> > >>> all the connections?
> > >>> We see this only on application crashes and restarts where the app
> needs
> > >>> todo the full anyways
> > >>> as it probably continues with stores offsets.
> > >> Yes, TCP connections get dropped.  It happens very often in production
> > >> clusters, actually.  When I was working on Hadoop, one of the most
> > >> common questions I heard from newcomers was "why do I see so many
> > >> EOFException messages in the logs"?  The other thing that happens a
> lot
> > >> is DNS outages or slowness.  Public clouds seem to have even more
> > >> unstable networks than the on-premise clusters.  I am not sure why
> that
> > >> is.
> > Hadoop has a wiki page on exactly this
> > https://wiki.apache.org/hadoop/EOFException
> >
> > besides user errors they have servers crashing and actually loss of
> > connection high on their list.
> > In the case of "server goes away" the cache goes with it. So nothing to
> > argue about the cache beeing reused by
> > a new connection.
> >
> > Can you make an argument at which point the epoch would be updated
> > broker side to maximise re-usage of the cache on
> > lost connections. In many cases the epoch would go out of sync and we
> > would need a full fetch anyways. Am I mistaken here?
>
> The current proposal is that the server can accept multiple requests in a
> row with the same sequence number.
>
> Colin
>
> >
> >
> >
> >
> > >>
> > >>>>>>>> Secondly, from a software engineering perspective, it's not a
> good idea
> > >>>>>>>> to try to tightly tie together TCP and our code.  We would have
> to
> > >>>>>>>> rework how we interact with NetworkClient so that we are aware
> of things
> > >>>>>>>> like TCP sessions closing or opening.  We would have to be
> careful
> > >>>>>>>> preserve the ordering of incoming messages when doing things
> like
> > >>>>>>>> putting incoming requests on to a queue to be processed by
> multiple
> > >>>>>>>> threads.  It's just a lot of complexity to add, and there's no
> upside.
> > >>>>>>> I see the point here. And I had a small chat with Dong Lin
> already
> > >>>>>>> making me aware of this. I tried out the approaches and propose
> the
> > >>>>>>> following:
> > >>>>>>>
> > >>>>>>> The client start and does a full fetch. It then does incremental
> fetches.
> > >>>>>>> The connection to the broker dies and is re-established by
> NetworkClient
> > >>>>>>> under the hood.
> > >>>>>>> The broker sees an incremental fetch without having state =>
> returns
> > >>>>>>> error:
> > >>>>>>> Client sees the error, does a full fetch and goes back to
> incrementally
> > >>>>>>> fetching.
> > >>>>>>>
> > >>>>>>> having this 1 additional error round trip is essentially the
> same as
> > >>>>>>> when something
> > >>>>>>> with the sessions or epoch changed unexpectedly to the client
> (say
> > >>>>>>> expiry).
> > >>>>>>>
> > >>>>>>> So its nothing extra added but the conditions are easier to
> evaluate.
> > >>>>>>> Especially since we do everything with NetworkClient. Other
> implementers
> > >>>>>>> on the
> > >>>>>>> protocol are free to optimizes this and do not do the errornours
> > >>>>>>> roundtrip on the
> > >>>>>>> new connection.
> > >>>>>>> Its a great plus that the client can know when the error is gonna
> > >>>>>>> happen. instead of
> > >>>>>>> the server to always have to report back if something changes
> > >>>>>>> unexpectedly for the client
> > >>>>>> You are assuming that the leader and the follower agree that the
> TCP
> > >>>>>> session drops at the same time.  When there are network problems,
> this
> > >>>>>> may not be true.  The leader may still think the previous TCP
> session is
> > >>>>>> active.  In that case, we have to keep the incremental fetch
> session
> > >>>>>> state around until we learn otherwise (which could be up to that
> 2 hour
> > >>>>>> timeout I mentioned).  And if we get a new incoming incremental
> fetch
> > >>>>>> request, we can't assume that it replaces the previous one,
> because the
> > >>>>>> IDs will be different (the new one starts a new session).
> > >>>>> As mentioned, no reason to fear some time-outs out of our control
> > >>>>>>>> Imagine that I made an argument that client IDs are "complex"
> and should
> > >>>>>>>> be removed from our APIs.  After all, we can just look at the
> remote IP
> > >>>>>>>> address and TCP port of each connection.  Would you think that
> was a
> > >>>>>>>> good idea?  The client ID is useful when looking at logs.  For
> example,
> > >>>>>>>> if a rebalance is having problems, you want to know what
> clients were
> > >>>>>>>> having a problem.  So having the client ID field to guide you is
> > >>>>>>>> actually much less "complex" in practice than not having an ID.
> > >>>>>>> I still cant follow why the correlation idea will not help here.
> > >>>>>>> Correlating logs with it usually works great. Even with
> primitive tools
> > >>>>>>> like grep
> > >>>>>> The correlation ID does help somewhat, but certainly not as much
> as a
> > >>>>>> unique 64-bit ID.  The correlation ID is not unique in the
> broker, just
> > >>>>>> unique to a single NetworkClient.  Simiarly, the correlation ID
> is not
> > >>>>>> unique on the client side, if there are multiple Consumers, etc.
> > >>>>> Can always bump entropy in correlation IDs, never had a problem
> > >>>>> of finding to many duplicates. Would be a different KIP though.
> > >>>>>>>> Similarly, if metadata responses had epoch numbers (simple
> incrementing
> > >>>>>>>> numbers), we would not have to debug problems like clients
> accidentally
> > >>>>>>>> getting old metadata from servers that had been partitioned off
> from the
> > >>>>>>>> network for a while.  Clients would know the difference between
> old and
> > >>>>>>>> new metadata.  So putting epochs in to the metadata request is
> much less
> > >>>>>>>> "complex" operationally, even though it's an extra field in the
> request.
> > >>>>>>>>      This has been discussed before on the mailing list.
> > >>>>>>>>
> > >>>>>>>> So I think the bottom line for me is that having the session ID
> and
> > >>>>>>>> session epoch, while it adds two extra fields, reduces
> operational
> > >>>>>>>> complexity and increases debuggability.  It avoids tightly
> coupling us
> > >>>>>>>> to assumptions about reliable ordered delivery which tend to be
> violated
> > >>>>>>>> in practice in multiple layers of the stack.  Finally, it
> avoids the
> > >>>>>>>> necessity of refactoring NetworkClient.
> > >>>>>>> So there is stacks out there that violate TCP guarantees? And
> software
> > >>>>>>> still works? How can this be? Can you elaborate a little where
> this
> > >>>>>>> can be violated? I am not very familiar with virtualized
> environments
> > >>>>>>> but they can't really violate TCP contracts.
> > >>>>>> TCP's guarantees of reliable, in-order transmission certainly can
> be
> > >>>>>> violated.  For example, I once had to debug a cluster where a
> certain
> > >>>>>> node had a network card which corrupted its transmissions
> occasionally.
> > >>>>>> With all the layers of checksums, you would think that this was
> not
> > >>>>>> possible, but it happened.  We occasionally got corrupted data
> written
> > >>>>>> to disk on the other end because of it.  Even more frustrating,
> the data
> > >>>>>> was not corrupted on disk on the sending node-- it was a bug in
> the
> > >>>>>> network card driver that was injecting the errors.
> > >>>>> true, but your broker might aswell read a corrupted 600GB as size
> from
> > >>>>> the network and die with OOM instantly.
> > >>>> If you read 600 GB as the size from the network, you will not "die
> with
> > >>>> OOM instantly."  That would be a bug.  Instead, you will notice
> that 600
> > >>>> GB is greater than max.message.bytes, and close the connection.
> > >>> We only check max.message.bytes to late to guard against consumer
> > >>> stalling.
> > >>> we dont have a notion of max.networkpacket.size before we allocate
> the
> > >>> bytebuffer to read it into.
> > >> "network packets" are not the same thing as "kafka RPCs."  One Kafka
> RPC
> > >> could take up mutiple ethernet packets.
> > >>
> > >> Also, max.message.bytes has nothing to do with "consumer stalling" --
> > >> you are probably thinking about some of the fetch request
> > >> configurations.  max.message.bytes is used by the RPC system to figure
> > >> out whether to read the full incoming RP
> > > Whoops, this is incorrect.  I was thinking about
> > > "socket.request.max.bytes" rather than "max.message.bytes."  Sorry
> about
> > > that.  See Ismael's email as well.
> > >
> > > best,
> > > Colin
> > >
> > >> best,
> > >> Colin
> > >>
> > >>>>> Optimizing for still having functional
> > >>>>> software under this circumstances is not reasonable.
> > >>>>> You want to get rid of such a
> > >>>>> node ASAP and pray that zookeepers ticks get corrupted often enough
> > >>>>> that it finally drops out of the cluster.
> > >>>>>
> > >>>>> There is a good reason that these kinda things
> > >>>>> https://issues.apache.org/jira/browse/MESOS-4105
> > >>>>> don't end up as kafka Jiras. In the end you can't run any software
> in
> > >>>>> these containers anymore. Application layer checksums are a neat
> thing to
> > >>>>> fail fast but trying to cope with this probably causes more bad
> than
> > >>>>> good.  So I would argue that we shouldn't try this for the fetch
> requests.
> > >>>> One of the goals of Apache Kafka is to be "a streaming platform...
> > >>>> [that] lets you store streams of records in a fault-tolerant way."
> For
> > >>>> more information, see https://kafka.apache.org/intro .
> Fault-tolerance
> > >>>> is explicitly part of the goal of Kafka.  Prayer should be
> optional, not
> > >>>> required, when running the software.
> > >>> Yes, we need to fail ASAP when we read corrupted packages. It seemed
> > >>> to me like you tried to make the case for pray and try to stay alive.
> > >>> Fault
> > >>> tolerance here means. I am a fishy box i am going to let a good box
> > >>> handle
> > >>> it and be silent until i get fixed up.
> > >>>> Crashing because someone sent you a bad packet is not reasonable
> > >>>> behavior.  It is a bug.  Similarly, bringing down the whole cluster,
> > >>>> which could a hundred nodes, because someone had a bad network
> adapter
> > >>>> is not reasonable behavior.  It is perhaps reasonable for the
> cluster to
> > >>>> perform worse when hardware is having problems.  But that's a
> different
> > >>>> discussion.
> > >>> See above.
> > >>>> best,
> > >>>> Colin
> > >>>>
> > >>>>>> However, my point was not about TCP's guarantees being violated.
> My
> > >>>>>> point is that TCP's guarantees are only one small building block
> to
> > >>>>>> build a robust distributed system.  TCP basically just says that
> if you
> > >>>>>> get any bytes from the stream, you will get the ones that were
> sent by
> > >>>>>> the sender, in the order they were sent.  TCP does not guarantee
> that
> > >>>>>> the bytes you send will get there.  It does not guarantee that if
> you
> > >>>>>> close the connection, the other end will know about it in a timely
> > >>>>>> fashion.
> > >>>>> These are very powerful grantees and since we use TCP we should
> > >>>>> piggy pack everything that is reasonable on to it. IMO there is no
> > >>>>> need to reimplement correct sequencing again if you get that from
> > >>>>> your transport layer. It saves you the complexity, it makes
> > >>>>> you application behave way more naturally and your api easier to
> > >>>>> understand.
> > >>>>>
> > >>>>> There is literally nothing the Kernel wont let you decide
> > >>>>> especially not any timings. Only noticeable exception being
> TIME_WAIT
> > >>>>> of usually 240 seconds but that already has little todo with the
> broker
> > >>>>> itself and
> > >>>>> if we are running out of usable ports because of this then expiring
> > >>>>> fetch requests
> > >>>>> wont help much anyways.
> > >>>>>
> > >>>>> I hope I could strengthen the trust you have in userland TCP
> connection
> > >>>>> management. It is really powerful and can be exploited for maximum
> gains
> > >>>>> without much risk in my opinion.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>> It does not guarantee that the bytes will be received in a
> > >>>>>> certain timeframe, and certainly doesn't guarantee that if you
> send a
> > >>>>>> byte on connection X and then on connection Y, that the remote
> end will
> > >>>>>> read a byte on X before reading a byte on Y.
> > >>>>> Noone expects this from two independent paths of any kind.
> > >>>>>
> > >>>>>> best,
> > >>>>>> Colin
> > >>>>>>
> > >>>>>>> Hope this made my view clearer, especially the first part.
> > >>>>>>>
> > >>>>>>> Best Jan
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> best,
> > >>>>>>>> Colin
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>> If there is an error such as NotLeaderForPartition is
> > >>>>>>>>> returned for some partitions, the follower can always send a
> full
> > >>>>>>>>> FetchRequest. Is there a scenario that only some of the
> partitions in a
> > >>>>>>>>> FetchResponse is lost?
> > >>>>>>>>>
> > >>>>>>>>> Thanks,
> > >>>>>>>>>
> > >>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<
> cmcc...@apache.org>  wrote:
> > >>>>>>>>>
> > >>>>>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > >>>>>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<
> cmcc...@apache.org>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > >>>>>>>>>>>>> Hey Colin,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks much for the update. I have a few questions below:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 1. I am not very sure that we need Fetch Session Epoch. It
> seems that
> > >>>>>>>>>>>>> Fetch
> > >>>>>>>>>>>>> Session Epoch is only needed to help leader distinguish
> between "a
> > >>>>>>>>>> full
> > >>>>>>>>>>>>> fetch request" and "a full fetch request and request a new
> > >>>>>>>>>> incremental
> > >>>>>>>>>>>>> fetch session". Alternatively, follower can also indicate
> "a full
> > >>>>>>>>>> fetch
> > >>>>>>>>>>>>> request and request a new incremental fetch session" by
> setting Fetch
> > >>>>>>>>>>>>> Session ID to -1 without using Fetch Session Epoch. Does
> this make
> > >>>>>>>>>> sense?
> > >>>>>>>>>>>> Hi Dong,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The fetch session epoch is very important for ensuring
> correctness.  It
> > >>>>>>>>>>>> prevents corrupted or incomplete fetch data due to network
> reordering
> > >>>>>>>>>> or
> > >>>>>>>>>>>> loss.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> For example, consider a scenario where the follower sends a
> fetch
> > >>>>>>>>>>>> request to the leader.  The leader responds, but the
> response is lost
> > >>>>>>>>>>>> because of network problems which affected the TCP
> session.  In that
> > >>>>>>>>>>>> case, the follower must establish a new TCP session and
> re-send the
> > >>>>>>>>>>>> incremental fetch request.  But the leader does not know
> that the
> > >>>>>>>>>>>> follower didn't receive the previous incremental fetch
> response.  It is
> > >>>>>>>>>>>> only the incremental fetch epoch which lets the leader know
> that it
> > >>>>>>>>>>>> needs to resend that data, and not data which comes
> afterwards.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> You could construct similar scenarios with message
> reordering,
> > >>>>>>>>>>>> duplication, etc.  Basically, this is a stateful protocol
> on an
> > >>>>>>>>>>>> unreliable network, and you need to know whether the
> follower got the
> > >>>>>>>>>>>> previous data you sent before you move on.  And you need to
> handle
> > >>>>>>>>>>>> issues like duplicated or delayed requests.  These issues
> do not affect
> > >>>>>>>>>>>> the full fetch request, because it is not stateful-- any
> full fetch
> > >>>>>>>>>>>> request can be understood and properly responded to in
> isolation.
> > >>>>>>>>>>>>
> > >>>>>>>>>>> Thanks for the explanation. This makes sense. On the other
> hand I would
> > >>>>>>>>>>> be interested in learning more about whether Becket's
> solution can help
> > >>>>>>>>>>> simplify the protocol by not having the echo field and
> whether that is
> > >>>>>>>>>>> worth doing.
> > >>>>>>>>>> Hi Dong,
> > >>>>>>>>>>
> > >>>>>>>>>> I commented about this in the other thread.  A solution which
> doesn't
> > >>>>>>>>>> maintain session information doesn't work here.
> > >>>>>>>>>>
> > >>>>>>>>>>>>> 2. It is said that Incremental FetchRequest will include
> partitions
> > >>>>>>>>>> whose
> > >>>>>>>>>>>>> fetch offset or maximum number of fetch bytes has been
> changed. If
> > >>>>>>>>>>>>> follower's logStartOffet of a partition has changed,
> should this
> > >>>>>>>>>>>>> partition also be included in the next FetchRequest to the
> leader?
> > >>>>>>>>>>>> Otherwise, it
> > >>>>>>>>>>>>> may affect the handling of DeleteRecordsRequest because
> leader may
> > >>>>>>>>>> not
> > >>>>>>>>>>>> know
> > >>>>>>>>>>>>> the corresponding data has been deleted on the follower.
> > >>>>>>>>>>>> Yeah, the follower should include the partition if the
> logStartOffset
> > >>>>>>>>>>>> has changed.  That should be spelled out on the KIP.  Fixed.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> 3. In the section "Per-Partition Data", a partition is not
> considered
> > >>>>>>>>>>>>> dirty if its log start offset has changed. Later in the
> section
> > >>>>>>>>>>>> "FetchRequest
> > >>>>>>>>>>>>> Changes", it is said that incremental fetch responses will
> include a
> > >>>>>>>>>>>>> partition if its logStartOffset has changed. It seems
> inconsistent.
> > >>>>>>>>>> Can
> > >>>>>>>>>>>>> you update the KIP to clarify it?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> In the "Per-Partition Data" section, it does say that
> logStartOffset
> > >>>>>>>>>>>> changes make a partition dirty, though, right?  The first
> bullet point
> > >>>>>>>>>>>> is:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> * The LogCleaner deletes messages, and this changes the
> log start
> > >>>>>>>>>> offset
> > >>>>>>>>>>>> of the partition on the leader., or
> > >>>>>>>>>>>>
> > >>>>>>>>>>> Ah I see. I think I didn't notice this because statement
> assumes that the
> > >>>>>>>>>>> LogStartOffset in the leader only changes due to LogCleaner.
> In fact the
> > >>>>>>>>>>> LogStartOffset can change on the leader due to either log
> retention and
> > >>>>>>>>>>> DeleteRecordsRequest. I haven't verified whether LogCleaner
> can change
> > >>>>>>>>>>> LogStartOffset though. It may be a bit better to just say
> that a
> > >>>>>>>>>>> partition is considered dirty if LogStartOffset changes.
> > >>>>>>>>>> I agree.  It should be straightforward to just resend the
> partition if
> > >>>>>>>>>> logStartOffset changes.
> > >>>>>>>>>>
> > >>>>>>>>>>>>> 4. In "Fetch Session Caching" section, it is said that
> each broker
> > >>>>>>>>>> has a
> > >>>>>>>>>>>>> limited number of slots. How is this number determined?
> Does this
> > >>>>>>>>>> require
> > >>>>>>>>>>>>> a new broker config for this number?
> > >>>>>>>>>>>> Good point.  I added two broker configuration parameters to
> control
> > >>>>>>>>>> this
> > >>>>>>>>>>>> number.
> > >>>>>>>>>>>>
> > >>>>>>>>>>> I am curious to see whether we can avoid some of these new
> configs. For
> > >>>>>>>>>>> example, incremental.fetch.session.cache.slots.per.broker
> is probably
> > >>>>>>>>>> not
> > >>>>>>>>>>> necessary because if a leader knows that a FetchRequest
> comes from a
> > >>>>>>>>>>> follower, we probably want the leader to always cache the
> information
> > >>>>>>>>>>> from that follower. Does this make sense?
> > >>>>>>>>>> Yeah, maybe we can avoid having
> > >>>>>>>>>> incremental.fetch.session.cache.slots.per.broker.
> > >>>>>>>>>>
> > >>>>>>>>>>> Maybe we can discuss the config later after there is
> agreement on how the
> > >>>>>>>>>>> protocol would look like.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>>> What is the error code if broker does
> > >>>>>>>>>>>>> not have new log for the incoming FetchRequest?
> > >>>>>>>>>>>> Hmm, is there a typo in this question?  Maybe you meant to
> ask what
> > >>>>>>>>>>>> happens if there is no new cache slot for the incoming
> FetchRequest?
> > >>>>>>>>>>>> That's not an error-- the incremental fetch session ID just
> gets set to
> > >>>>>>>>>>>> 0, indicating no incremental fetch session was created.
> > >>>>>>>>>>>>
> > >>>>>>>>>>> Yeah there is a typo. You have answered my question.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>>> 5. Can you clarify what happens if follower adds a
> partition to the
> > >>>>>>>>>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest?
> Does leader
> > >>>>>>>>>>>>> needs to generate a new session for this
> ReplicaFetcherThread or
> > >>>>>>>>>> does it
> > >>>>>>>>>>>> re-use
> > >>>>>>>>>>>>> the existing session?  If it uses a new session, is the
> old session
> > >>>>>>>>>>>>> actively deleted from the slot?
> > >>>>>>>>>>>> The basic idea is that you can't make changes, except by
> sending a full
> > >>>>>>>>>>>> fetch request.  However, perhaps we can allow the client to
> re-use its
> > >>>>>>>>>>>> existing session ID.  If the client sets sessionId = id,
> epoch = 0, it
> > >>>>>>>>>>>> could re-initialize the session.
> > >>>>>>>>>>>>
> > >>>>>>>>>>> Yeah I agree with the basic idea. We probably want to
> understand more
> > >>>>>>>>>>> detail about how this works later.
> > >>>>>>>>>> Sounds good.  I updated the KIP with this information.  A
> > >>>>>>>>>> re-initialization should be exactly the same as an
> initialization,
> > >>>>>>>>>> except that it reuses an existing ID.
> > >>>>>>>>>>
> > >>>>>>>>>> best,
> > >>>>>>>>>> Colin
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>>>> BTW, I think it may be useful if the KIP can include the
> example
> > >>>>>>>>>> workflow
> > >>>>>>>>>>>>> of how this feature will be used in case of partition
> change and so
> > >>>>>>>>>> on.
> > >>>>>>>>>>>> Yeah, that might help.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> best,
> > >>>>>>>>>>>> Colin
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>> Dong
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe<
> cmcc...@apache.org>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I updated the KIP with the ideas we've been discussing.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> best,
> > >>>>>>>>>>>>>> Colin
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
> > >>>>>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
> > >>>>>>>>>>>>>>>> Hi Colin, thank you  for this KIP, it can become a
> really
> > >>>>>>>>>> useful
> > >>>>>>>>>>>> thing.
> > >>>>>>>>>>>>>>>> I just scanned through the discussion so far and wanted
> to
> > >>>>>>>>>> start a
> > >>>>>>>>>>>>>>>> thread to make as decision about keeping the
> > >>>>>>>>>>>>>>>> cache with the Connection / Session or having some sort
> of UUID
> > >>>>>>>>>>>> indN
> > >>>>>>>>>>>>>> exed
> > >>>>>>>>>>>>>>>> global Map.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Sorry if that has been settled already and I missed it.
> In this
> > >>>>>>>>>>>> case
> > >>>>>>>>>>>>>>>> could anyone point me to the discussion?
> > >>>>>>>>>>>>>>> Hi Jan,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I don't think anyone has discussed the idea of tying the
> cache
> > >>>>>>>>>> to an
> > >>>>>>>>>>>>>>> individual TCP session yet.  I agree that since the
> cache is
> > >>>>>>>>>>>> intended to
> > >>>>>>>>>>>>>>> be used only by a single follower or client, it's an
> interesting
> > >>>>>>>>>>>> thing
> > >>>>>>>>>>>>>>> to think about.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I guess the obvious disadvantage is that whenever your
> TCP
> > >>>>>>>>>> session
> > >>>>>>>>>>>>>>> drops, you have to make a full fetch request rather than
> an
> > >>>>>>>>>>>> incremental
> > >>>>>>>>>>>>>>> one.  It's not clear to me how often this happens in
> practice --
> > >>>>>>>>>> it
> > >>>>>>>>>>>>>>> probably depends a lot on the quality of the network.
> From a
> > >>>>>>>>>> code
> > >>>>>>>>>>>>>>> perspective, it might also be a bit difficult to access
> data
> > >>>>>>>>>>>> associated
> > >>>>>>>>>>>>>>> with the Session from classes like KafkaApis (although
> we could
> > >>>>>>>>>>>> refactor
> > >>>>>>>>>>>>>>> it to make this easier).
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> It's also clear that even if we tie the cache to the
> session, we
> > >>>>>>>>>>>> still
> > >>>>>>>>>>>>>>> have to have limits on the number of caches we're
> willing to
> > >>>>>>>>>> create.
> > >>>>>>>>>>>>>>> And probably we should reserve some cache slots for each
> > >>>>>>>>>> follower, so
> > >>>>>>>>>>>>>>> that clients don't take all of them.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Id rather see a protocol in which the client is hinting
> the
> > >>>>>>>>>> broker
> > >>>>>>>>>>>>>> that,
> > >>>>>>>>>>>>>>>> he is going to use the feature instead of a client
> > >>>>>>>>>>>>>>>> realizing that the broker just offered the feature
> (regardless
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>>>> protocol version which should only indicate that the
> feature
> > >>>>>>>>>>>>>>>> would be usable).
> > >>>>>>>>>>>>>>> Hmm.  I'm not sure what you mean by "hinting."  I do
> think that
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>> server should have the option of not accepting
> incremental
> > >>>>>>>>>> requests
> > >>>>>>>>>>>> from
> > >>>>>>>>>>>>>>> specific clients, in order to save memory space.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> This seems to work better with a per
> > >>>>>>>>>>>>>>>> connection/session attached Metadata than with a Map
> and could
> > >>>>>>>>>>>> allow
> > >>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>> easier client implementations.
> > >>>>>>>>>>>>>>>> It would also make Client-side code easier as there
> wouldn't
> > >>>>>>>>>> be any
> > >>>>>>>>>>>>>>>> Cache-miss error Messages to handle.
> > >>>>>>>>>>>>>>> It is nice not to have to handle cache-miss responses, I
> agree.
> > >>>>>>>>>>>>>>> However, TCP sessions aren't exposed to most of our
> client-side
> > >>>>>>>>>> code.
> > >>>>>>>>>>>>>>> For example, when the Producer creates a message and
> hands it
> > >>>>>>>>>> off to
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> NetworkClient, the NC will transparently re-connect and
> re-send a
> > >>>>>>>>>>>>>>> message if the first send failed.  The higher-level code
> will
> > >>>>>>>>>> not be
> > >>>>>>>>>>>>>>> informed about whether the TCP session was
> re-established,
> > >>>>>>>>>> whether an
> > >>>>>>>>>>>>>>> existing TCP session was used, and so on.  So overall I
> would
> > >>>>>>>>>> still
> > >>>>>>>>>>>> lean
> > >>>>>>>>>>>>>>> towards not coupling this to the TCP session...
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> best,
> > >>>>>>>>>>>>>>> Colin
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>       Thank you again for the KIP. And again, if this
> was clarified
> > >>>>>>>>>>>> already
> > >>>>>>>>>>>>>>>> please drop me a hint where I could read about it.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Best Jan
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
> > >>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I created a KIP to improve the scalability and latency
> of
> > >>>>>>>>>>>>>> FetchRequest:
> > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > >>>>>>>>>>>>>> Partition+Scalability
> > >>>>>>>>>>>>>>>>> Please take a look.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> cheers,
> > >>>>>>>>>>>>>>>>> Colin
> >
>
>

Reply via email to