On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote: > Hi Colin > > Addressing the topic of how to manage slots from the other thread. > With tcp connections all this comes for free essentially.
Hi Jan, I don't think that it's accurate to say that cache management "comes for free" by coupling the incremental fetch session with the TCP session. When a new TCP session is started by a fetch request, you still have to decide whether to grant that request an incremental fetch session or not. If your answer is that you always grant the request, I would argue that you do not have cache management. I guess you could argue that timeouts are cache management, but I don't find that argument persuasive. Anyone could just create a lot of TCP sessions and use a lot of resources, in that case. So there is essentially no limit on memory use. In any case, TCP sessions don't help us implement fetch session timeouts. > I still would argue we disable it by default and make a flag in the > broker to ask the leader to maintain the cache while replicating and also only > have it optional in consumers (default to off) so one can turn it on > where it really hurts. MirrorMaker and audit consumers prominently. I agree with Jason's point from earlier in the thread. Adding extra configuration knobs that aren't really necessary can harm usability. Certainly asking people to manually turn on a feature "where it really hurts" seems to fall in that category, when we could easily enable it automatically for them. > > Otherwise I left a few remarks in-line, which should help to understand > my view of the situation better > > Best Jan > > > On 05.12.2017 08:06, Colin McCabe wrote: > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote: > >> > >> On 03.12.2017 21:55, Colin McCabe wrote: > >>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote: > >>>> Thanks for the explanation, Colin. A few more questions. > >>>> > >>>>> The session epoch is not complex. It's just a number which increments > >>>>> on each incremental fetch. The session epoch is also useful for > >>>>> debugging-- it allows you to match up requests and responses when > >>>>> looking at log files. > >>>> Currently each request in Kafka has a correlation id to help match the > >>>> requests and responses. Is epoch doing something differently? > >>> Hi Becket, > >>> > >>> The correlation ID is used within a single TCP session, to uniquely > >>> associate a request with a response. The correlation ID is not unique > >>> (and has no meaning) outside the context of that single TCP session. > >>> > >>> Keep in mind, NetworkClient is in charge of TCP sessions, and generally > >>> tries to hide that information from the upper layers of the code. So > >>> when you submit a request to NetworkClient, you don't know if that > >>> request creates a TCP session, or reuses an existing one. > >>>>> Unfortunately, this doesn't work. Imagine the client misses an > >>>>> increment fetch response about a partition. And then the partition is > >>>>> never updated after that. The client has no way to know about the > >>>>> partition, since it won't be included in any future incremental fetch > >>>>> responses. And there are no offsets to compare, since the partition is > >>>>> simply omitted from the response. > >>>> I am curious about in which situation would the follower miss a response > >>>> of a partition. If the entire FetchResponse is lost (e.g. timeout), the > >>>> follower would disconnect and retry. That will result in sending a full > >>>> FetchRequest. > >>> Basically, you are proposing that we rely on TCP for reliable delivery > >>> in a distributed system. That isn't a good idea for a bunch of > >>> different reasons. First of all, TCP timeouts tend to be very long. So > >>> if the TCP session timing out is your error detection mechanism, you > >>> have to wait minutes for messages to timeout. Of course, we add a > >>> timeout on top of that after which we declare the connection bad and > >>> manually close it. But just because the session is closed on one end > >>> doesn't mean that the other end knows that it is closed. So the leader > >>> may have to wait quite a long time before TCP decides that yes, > >>> connection X from the follower is dead and not coming back, even though > >>> gremlins ate the FIN packet which the follower attempted to translate. > >>> If the cache state is tied to that TCP session, we have to keep that > >>> cache around for a much longer time than we should. > >> Hi, > >> > >> I see this from a different perspective. The cache expiry time > >> has the same semantic as idle connection time in this scenario. > >> It is the time range we expect the client to come back an reuse > >> its broker side state. I would argue that on close we would get an > >> extra shot at cleaning up the session state early. As opposed to > >> always wait for that duration for expiry to happen. > > Hi Jan, > > > > The idea here is that the incremental fetch cache expiry time can be > > much shorter than the TCP session timeout. In general the TCP session > > timeout is common to all TCP connections, and very long. To make these > > numbers a little more concrete, the TCP session timeout is often > > configured to be 2 hours on Linux. (See > > https://www.cyberciti.biz/tips/linux-increasing-or-decreasing-tcp-sockets-timeouts.html > > ) The timeout I was proposing for incremental fetch sessions was one or > > two minutes at most. > Currently this is taken care of by > connections.max.idle.ms on the broker and defaults to something of few > minutes. It is 10 minutes by default, which is longer than what we want the incremental fetch session timeout to be. There's no reason to couple these two things. > Also something we could let the client change if we really wanted to. > So there is no need to worry about coupling our implementation to some > timeouts given by the OS, with TCP one always has full control over the worst > times + one gets the extra shot cleaning up early when the close comes > through. > Which is the majority of the cases. In the majority of cases, the TCP session will be re-established. In that case, we have to send a full fetch request rather than an incremental fetch request. > > > > >>> Secondly, from a software engineering perspective, it's not a good idea > >>> to try to tightly tie together TCP and our code. We would have to > >>> rework how we interact with NetworkClient so that we are aware of things > >>> like TCP sessions closing or opening. We would have to be careful > >>> preserve the ordering of incoming messages when doing things like > >>> putting incoming requests on to a queue to be processed by multiple > >>> threads. It's just a lot of complexity to add, and there's no upside. > >> I see the point here. And I had a small chat with Dong Lin already > >> making me aware of this. I tried out the approaches and propose the > >> following: > >> > >> The client start and does a full fetch. It then does incremental fetches. > >> The connection to the broker dies and is re-established by NetworkClient > >> under the hood. > >> The broker sees an incremental fetch without having state => returns > >> error: > >> Client sees the error, does a full fetch and goes back to incrementally > >> fetching. > >> > >> having this 1 additional error round trip is essentially the same as > >> when something > >> with the sessions or epoch changed unexpectedly to the client (say > >> expiry). > >> > >> So its nothing extra added but the conditions are easier to evaluate. > >> Especially since we do everything with NetworkClient. Other implementers > >> on the > >> protocol are free to optimizes this and do not do the errornours > >> roundtrip on the > >> new connection. > >> Its a great plus that the client can know when the error is gonna > >> happen. instead of > >> the server to always have to report back if something changes > >> unexpectedly for the client > > You are assuming that the leader and the follower agree that the TCP > > session drops at the same time. When there are network problems, this > > may not be true. The leader may still think the previous TCP session is > > active. In that case, we have to keep the incremental fetch session > > state around until we learn otherwise (which could be up to that 2 hour > > timeout I mentioned). And if we get a new incoming incremental fetch > > request, we can't assume that it replaces the previous one, because the > > IDs will be different (the new one starts a new session). > As mentioned, no reason to fear some time-outs out of our control > > > >>> Imagine that I made an argument that client IDs are "complex" and should > >>> be removed from our APIs. After all, we can just look at the remote IP > >>> address and TCP port of each connection. Would you think that was a > >>> good idea? The client ID is useful when looking at logs. For example, > >>> if a rebalance is having problems, you want to know what clients were > >>> having a problem. So having the client ID field to guide you is > >>> actually much less "complex" in practice than not having an ID. > >> I still cant follow why the correlation idea will not help here. > >> Correlating logs with it usually works great. Even with primitive tools > >> like grep > > The correlation ID does help somewhat, but certainly not as much as a > > unique 64-bit ID. The correlation ID is not unique in the broker, just > > unique to a single NetworkClient. Simiarly, the correlation ID is not > > unique on the client side, if there are multiple Consumers, etc. > Can always bump entropy in correlation IDs, never had a problem > of finding to many duplicates. Would be a different KIP though. > > > >>> Similarly, if metadata responses had epoch numbers (simple incrementing > >>> numbers), we would not have to debug problems like clients accidentally > >>> getting old metadata from servers that had been partitioned off from the > >>> network for a while. Clients would know the difference between old and > >>> new metadata. So putting epochs in to the metadata request is much less > >>> "complex" operationally, even though it's an extra field in the request. > >>> This has been discussed before on the mailing list. > >>> > >>> So I think the bottom line for me is that having the session ID and > >>> session epoch, while it adds two extra fields, reduces operational > >>> complexity and increases debuggability. It avoids tightly coupling us > >>> to assumptions about reliable ordered delivery which tend to be violated > >>> in practice in multiple layers of the stack. Finally, it avoids the > >>> necessity of refactoring NetworkClient. > >> So there is stacks out there that violate TCP guarantees? And software > >> still works? How can this be? Can you elaborate a little where this > >> can be violated? I am not very familiar with virtualized environments > >> but they can't really violate TCP contracts. > > TCP's guarantees of reliable, in-order transmission certainly can be > > violated. For example, I once had to debug a cluster where a certain > > node had a network card which corrupted its transmissions occasionally. > > With all the layers of checksums, you would think that this was not > > possible, but it happened. We occasionally got corrupted data written > > to disk on the other end because of it. Even more frustrating, the data > > was not corrupted on disk on the sending node-- it was a bug in the > > network card driver that was injecting the errors. > true, but your broker might aswell read a corrupted 600GB as size from > the network and die with OOM instantly. If you read 600 GB as the size from the network, you will not "die with OOM instantly." That would be a bug. Instead, you will notice that 600 GB is greater than max.message.bytes, and close the connection. > Optimizing for still having functional > software under this circumstances is not reasonable. > You want to get rid of such a > node ASAP and pray that zookeepers ticks get corrupted often enough > that it finally drops out of the cluster. > > There is a good reason that these kinda things > https://issues.apache.org/jira/browse/MESOS-4105 > don't end up as kafka Jiras. In the end you can't run any software in > these containers anymore. Application layer checksums are a neat thing to > fail fast but trying to cope with this probably causes more bad than > good. So I would argue that we shouldn't try this for the fetch requests. One of the goals of Apache Kafka is to be "a streaming platform... [that] lets you store streams of records in a fault-tolerant way." For more information, see https://kafka.apache.org/intro . Fault-tolerance is explicitly part of the goal of Kafka. Prayer should be optional, not required, when running the software. Crashing because someone sent you a bad packet is not reasonable behavior. It is a bug. Similarly, bringing down the whole cluster, which could a hundred nodes, because someone had a bad network adapter is not reasonable behavior. It is perhaps reasonable for the cluster to perform worse when hardware is having problems. But that's a different discussion. best, Colin > > > > > > However, my point was not about TCP's guarantees being violated. My > > point is that TCP's guarantees are only one small building block to > > build a robust distributed system. TCP basically just says that if you > > get any bytes from the stream, you will get the ones that were sent by > > the sender, in the order they were sent. TCP does not guarantee that > > the bytes you send will get there. It does not guarantee that if you > > close the connection, the other end will know about it in a timely > > fashion. > These are very powerful grantees and since we use TCP we should > piggy pack everything that is reasonable on to it. IMO there is no > need to reimplement correct sequencing again if you get that from > your transport layer. It saves you the complexity, it makes > you application behave way more naturally and your api easier to > understand. > > There is literally nothing the Kernel wont let you decide > especially not any timings. Only noticeable exception being TIME_WAIT > of usually 240 seconds but that already has little todo with the broker > itself and > if we are running out of usable ports because of this then expiring > fetch requests > wont help much anyways. > > I hope I could strengthen the trust you have in userland TCP connection > management. It is really powerful and can be exploited for maximum gains > without much risk in my opinion. > > > > > It does not guarantee that the bytes will be received in a > > certain timeframe, and certainly doesn't guarantee that if you send a > > byte on connection X and then on connection Y, that the remote end will > > read a byte on X before reading a byte on Y. > Noone expects this from two independent paths of any kind. > > > > > best, > > Colin > > > >> Hope this made my view clearer, especially the first part. > >> > >> Best Jan > >> > >> > >>> best, > >>> Colin > >>> > >>> > >>>> If there is an error such as NotLeaderForPartition is > >>>> returned for some partitions, the follower can always send a full > >>>> FetchRequest. Is there a scenario that only some of the partitions in a > >>>> FetchResponse is lost? > >>>> > >>>> Thanks, > >>>> > >>>> Jiangjie (Becket) Qin > >>>> > >>>> > >>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<cmcc...@apache.org> wrote: > >>>> > >>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote: > >>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<cmcc...@apache.org> > >>>>> wrote: > >>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote: > >>>>>>>> Hey Colin, > >>>>>>>> > >>>>>>>> Thanks much for the update. I have a few questions below: > >>>>>>>> > >>>>>>>> 1. I am not very sure that we need Fetch Session Epoch. It seems that > >>>>>>>> Fetch > >>>>>>>> Session Epoch is only needed to help leader distinguish between "a > >>>>> full > >>>>>>>> fetch request" and "a full fetch request and request a new > >>>>> incremental > >>>>>>>> fetch session". Alternatively, follower can also indicate "a full > >>>>> fetch > >>>>>>>> request and request a new incremental fetch session" by setting Fetch > >>>>>>>> Session ID to -1 without using Fetch Session Epoch. Does this make > >>>>> sense? > >>>>>>> Hi Dong, > >>>>>>> > >>>>>>> The fetch session epoch is very important for ensuring correctness. > >>>>>>> It > >>>>>>> prevents corrupted or incomplete fetch data due to network reordering > >>>>> or > >>>>>>> loss. > >>>>>>> > >>>>>>> For example, consider a scenario where the follower sends a fetch > >>>>>>> request to the leader. The leader responds, but the response is lost > >>>>>>> because of network problems which affected the TCP session. In that > >>>>>>> case, the follower must establish a new TCP session and re-send the > >>>>>>> incremental fetch request. But the leader does not know that the > >>>>>>> follower didn't receive the previous incremental fetch response. It > >>>>>>> is > >>>>>>> only the incremental fetch epoch which lets the leader know that it > >>>>>>> needs to resend that data, and not data which comes afterwards. > >>>>>>> > >>>>>>> You could construct similar scenarios with message reordering, > >>>>>>> duplication, etc. Basically, this is a stateful protocol on an > >>>>>>> unreliable network, and you need to know whether the follower got the > >>>>>>> previous data you sent before you move on. And you need to handle > >>>>>>> issues like duplicated or delayed requests. These issues do not > >>>>>>> affect > >>>>>>> the full fetch request, because it is not stateful-- any full fetch > >>>>>>> request can be understood and properly responded to in isolation. > >>>>>>> > >>>>>> Thanks for the explanation. This makes sense. On the other hand I would > >>>>>> be interested in learning more about whether Becket's solution can help > >>>>>> simplify the protocol by not having the echo field and whether that is > >>>>>> worth doing. > >>>>> Hi Dong, > >>>>> > >>>>> I commented about this in the other thread. A solution which doesn't > >>>>> maintain session information doesn't work here. > >>>>> > >>>>>>>> 2. It is said that Incremental FetchRequest will include partitions > >>>>> whose > >>>>>>>> fetch offset or maximum number of fetch bytes has been changed. If > >>>>>>>> follower's logStartOffet of a partition has changed, should this > >>>>>>>> partition also be included in the next FetchRequest to the leader? > >>>>>>> Otherwise, it > >>>>>>>> may affect the handling of DeleteRecordsRequest because leader may > >>>>> not > >>>>>>> know > >>>>>>>> the corresponding data has been deleted on the follower. > >>>>>>> Yeah, the follower should include the partition if the logStartOffset > >>>>>>> has changed. That should be spelled out on the KIP. Fixed. > >>>>>>> > >>>>>>>> 3. In the section "Per-Partition Data", a partition is not considered > >>>>>>>> dirty if its log start offset has changed. Later in the section > >>>>>>> "FetchRequest > >>>>>>>> Changes", it is said that incremental fetch responses will include a > >>>>>>>> partition if its logStartOffset has changed. It seems inconsistent. > >>>>> Can > >>>>>>>> you update the KIP to clarify it? > >>>>>>>> > >>>>>>> In the "Per-Partition Data" section, it does say that logStartOffset > >>>>>>> changes make a partition dirty, though, right? The first bullet point > >>>>>>> is: > >>>>>>> > >>>>>>>> * The LogCleaner deletes messages, and this changes the log start > >>>>> offset > >>>>>>> of the partition on the leader., or > >>>>>>> > >>>>>> Ah I see. I think I didn't notice this because statement assumes that > >>>>>> the > >>>>>> LogStartOffset in the leader only changes due to LogCleaner. In fact > >>>>>> the > >>>>>> LogStartOffset can change on the leader due to either log retention and > >>>>>> DeleteRecordsRequest. I haven't verified whether LogCleaner can change > >>>>>> LogStartOffset though. It may be a bit better to just say that a > >>>>>> partition is considered dirty if LogStartOffset changes. > >>>>> I agree. It should be straightforward to just resend the partition if > >>>>> logStartOffset changes. > >>>>> > >>>>>>>> 4. In "Fetch Session Caching" section, it is said that each broker > >>>>> has a > >>>>>>>> limited number of slots. How is this number determined? Does this > >>>>> require > >>>>>>>> a new broker config for this number? > >>>>>>> Good point. I added two broker configuration parameters to control > >>>>> this > >>>>>>> number. > >>>>>>> > >>>>>> I am curious to see whether we can avoid some of these new configs. For > >>>>>> example, incremental.fetch.session.cache.slots.per.broker is probably > >>>>> not > >>>>>> necessary because if a leader knows that a FetchRequest comes from a > >>>>>> follower, we probably want the leader to always cache the information > >>>>>> from that follower. Does this make sense? > >>>>> Yeah, maybe we can avoid having > >>>>> incremental.fetch.session.cache.slots.per.broker. > >>>>> > >>>>>> Maybe we can discuss the config later after there is agreement on how > >>>>>> the > >>>>>> protocol would look like. > >>>>>> > >>>>>> > >>>>>>>> What is the error code if broker does > >>>>>>>> not have new log for the incoming FetchRequest? > >>>>>>> Hmm, is there a typo in this question? Maybe you meant to ask what > >>>>>>> happens if there is no new cache slot for the incoming FetchRequest? > >>>>>>> That's not an error-- the incremental fetch session ID just gets set > >>>>>>> to > >>>>>>> 0, indicating no incremental fetch session was created. > >>>>>>> > >>>>>> Yeah there is a typo. You have answered my question. > >>>>>> > >>>>>> > >>>>>>>> 5. Can you clarify what happens if follower adds a partition to the > >>>>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does leader > >>>>>>>> needs to generate a new session for this ReplicaFetcherThread or > >>>>> does it > >>>>>>> re-use > >>>>>>>> the existing session? If it uses a new session, is the old session > >>>>>>>> actively deleted from the slot? > >>>>>>> The basic idea is that you can't make changes, except by sending a > >>>>>>> full > >>>>>>> fetch request. However, perhaps we can allow the client to re-use its > >>>>>>> existing session ID. If the client sets sessionId = id, epoch = 0, it > >>>>>>> could re-initialize the session. > >>>>>>> > >>>>>> Yeah I agree with the basic idea. We probably want to understand more > >>>>>> detail about how this works later. > >>>>> Sounds good. I updated the KIP with this information. A > >>>>> re-initialization should be exactly the same as an initialization, > >>>>> except that it reuses an existing ID. > >>>>> > >>>>> best, > >>>>> Colin > >>>>> > >>>>> > >>>>>>>> BTW, I think it may be useful if the KIP can include the example > >>>>> workflow > >>>>>>>> of how this feature will be used in case of partition change and so > >>>>> on. > >>>>>>> Yeah, that might help. > >>>>>>> > >>>>>>> best, > >>>>>>> Colin > >>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Dong > >>>>>>>> > >>>>>>>> > >>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe<cmcc...@apache.org> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> I updated the KIP with the ideas we've been discussing. > >>>>>>>>> > >>>>>>>>> best, > >>>>>>>>> Colin > >>>>>>>>> > >>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote: > >>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote: > >>>>>>>>>>> Hi Colin, thank you for this KIP, it can become a really > >>>>> useful > >>>>>>> thing. > >>>>>>>>>>> I just scanned through the discussion so far and wanted to > >>>>> start a > >>>>>>>>>>> thread to make as decision about keeping the > >>>>>>>>>>> cache with the Connection / Session or having some sort of UUID > >>>>>>> indN > >>>>>>>>> exed > >>>>>>>>>>> global Map. > >>>>>>>>>>> > >>>>>>>>>>> Sorry if that has been settled already and I missed it. In this > >>>>>>> case > >>>>>>>>>>> could anyone point me to the discussion? > >>>>>>>>>> Hi Jan, > >>>>>>>>>> > >>>>>>>>>> I don't think anyone has discussed the idea of tying the cache > >>>>> to an > >>>>>>>>>> individual TCP session yet. I agree that since the cache is > >>>>>>> intended to > >>>>>>>>>> be used only by a single follower or client, it's an interesting > >>>>>>> thing > >>>>>>>>>> to think about. > >>>>>>>>>> > >>>>>>>>>> I guess the obvious disadvantage is that whenever your TCP > >>>>> session > >>>>>>>>>> drops, you have to make a full fetch request rather than an > >>>>>>> incremental > >>>>>>>>>> one. It's not clear to me how often this happens in practice -- > >>>>> it > >>>>>>>>>> probably depends a lot on the quality of the network. From a > >>>>> code > >>>>>>>>>> perspective, it might also be a bit difficult to access data > >>>>>>> associated > >>>>>>>>>> with the Session from classes like KafkaApis (although we could > >>>>>>> refactor > >>>>>>>>>> it to make this easier). > >>>>>>>>>> > >>>>>>>>>> It's also clear that even if we tie the cache to the session, we > >>>>>>> still > >>>>>>>>>> have to have limits on the number of caches we're willing to > >>>>> create. > >>>>>>>>>> And probably we should reserve some cache slots for each > >>>>> follower, so > >>>>>>>>>> that clients don't take all of them. > >>>>>>>>>> > >>>>>>>>>>> Id rather see a protocol in which the client is hinting the > >>>>> broker > >>>>>>>>> that, > >>>>>>>>>>> he is going to use the feature instead of a client > >>>>>>>>>>> realizing that the broker just offered the feature (regardless > >>>>> of > >>>>>>>>>>> protocol version which should only indicate that the feature > >>>>>>>>>>> would be usable). > >>>>>>>>>> Hmm. I'm not sure what you mean by "hinting." I do think that > >>>>> the > >>>>>>>>>> server should have the option of not accepting incremental > >>>>> requests > >>>>>>> from > >>>>>>>>>> specific clients, in order to save memory space. > >>>>>>>>>> > >>>>>>>>>>> This seems to work better with a per > >>>>>>>>>>> connection/session attached Metadata than with a Map and could > >>>>>>> allow > >>>>>>>>> for > >>>>>>>>>>> easier client implementations. > >>>>>>>>>>> It would also make Client-side code easier as there wouldn't > >>>>> be any > >>>>>>>>>>> Cache-miss error Messages to handle. > >>>>>>>>>> It is nice not to have to handle cache-miss responses, I agree. > >>>>>>>>>> However, TCP sessions aren't exposed to most of our client-side > >>>>> code. > >>>>>>>>>> For example, when the Producer creates a message and hands it > >>>>> off to > >>>>>>> the > >>>>>>>>>> NetworkClient, the NC will transparently re-connect and re-send a > >>>>>>>>>> message if the first send failed. The higher-level code will > >>>>> not be > >>>>>>>>>> informed about whether the TCP session was re-established, > >>>>> whether an > >>>>>>>>>> existing TCP session was used, and so on. So overall I would > >>>>> still > >>>>>>> lean > >>>>>>>>>> towards not coupling this to the TCP session... > >>>>>>>>>> > >>>>>>>>>> best, > >>>>>>>>>> Colin > >>>>>>>>>> > >>>>>>>>>>> Thank you again for the KIP. And again, if this was clarified > >>>>>>> already > >>>>>>>>>>> please drop me a hint where I could read about it. > >>>>>>>>>>> > >>>>>>>>>>> Best Jan > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote: > >>>>>>>>>>>> Hi all, > >>>>>>>>>>>> > >>>>>>>>>>>> I created a KIP to improve the scalability and latency of > >>>>>>>>> FetchRequest: > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > >>>>>>>>> Partition+Scalability > >>>>>>>>>>>> Please take a look. > >>>>>>>>>>>> > >>>>>>>>>>>> cheers, > >>>>>>>>>>>> Colin >