On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote: > On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote: > > Hi, > > > > sorry for the late reply, busy times :-/ > > > > I would ask you one thing maybe. Since the timeout > > argument seems to be settled I have no further argument > > form your side except the "i don't want to". > > > > Can you see that connection.max.idle.max is the exact time > > that expresses "We expect the client to be away for this long, > > and come back and continue"? > > Hi Jan, > > Sure, connection.max.idle.max is the exact time that we want to keep > around a TCP session. TCP sessions are relatively cheap, so we can > afford to keep them around for 10 minutes by default. Incremental fetch > state is less cheap, so we want to set a shorter timeout for it. We > also want new TCP sessions to be able to reuse an existing incremental > fetch session rather than creating a new one and waiting for the old one > to time out. > > > > > also clarified some stuff inline > > > > Best Jan > > > > > > > > > > On 05.12.2017 23:14, Colin McCabe wrote: > > > On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote: > > >> Hi Colin > > >> > > >> Addressing the topic of how to manage slots from the other thread. > > >> With tcp connections all this comes for free essentially. > > > Hi Jan, > > > > > > I don't think that it's accurate to say that cache management "comes for > > > free" by coupling the incremental fetch session with the TCP session. > > > When a new TCP session is started by a fetch request, you still have to > > > decide whether to grant that request an incremental fetch session or > > > not. If your answer is that you always grant the request, I would argue > > > that you do not have cache management. > > First I would say, the client has a big say in this. If the client > > is not going to issue incremental he shouldn't ask for a cache > > when the client ask for the cache we still have all options to deny. > > To put it simply, we have to have some cache management above and beyond > just giving out an incremental fetch session to anyone who has a TCP > session. Therefore, caching does not become simpler if you couple the > fetch session to the TCP session. > > > > > > > > > I guess you could argue that timeouts are cache management, but I don't > > > find that argument persuasive. Anyone could just create a lot of TCP > > > sessions and use a lot of resources, in that case. So there is > > > essentially no limit on memory use. In any case, TCP sessions don't > > > help us implement fetch session timeouts. > > We still have all the options denying the request to keep the state. > > What you want seems like a max connections / ip safeguard. > > I can currently take down a broker with to many connections easily. > > > > > > >> I still would argue we disable it by default and make a flag in the > > >> broker to ask the leader to maintain the cache while replicating and > > >> also only > > >> have it optional in consumers (default to off) so one can turn it on > > >> where it really hurts. MirrorMaker and audit consumers prominently. > > > I agree with Jason's point from earlier in the thread. Adding extra > > > configuration knobs that aren't really necessary can harm usability. > > > Certainly asking people to manually turn on a feature "where it really > > > hurts" seems to fall in that category, when we could easily enable it > > > automatically for them. > > This doesn't make much sense to me. > > There are no tradeoffs to think about from the client's point of view: > it always wants an incremental fetch session. So there is no benefit to > making the clients configure an extra setting. Updating and managing > client configurations is also more difficult than managing broker > configurations for most users. > > > You also wanted to implement > > a "turn of in case of bug"-knob. Having the client indicate if the > > feauture will be used seems reasonable to me., > > True. However, if there is a bug, we could also roll back the client, > so having this configuration knob is not strictly required. > > > > > > >> Otherwise I left a few remarks in-line, which should help to understand > > >> my view of the situation better > > >> > > >> Best Jan > > >> > > >> > > >> On 05.12.2017 08:06, Colin McCabe wrote: > > >>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote: > > >>>> On 03.12.2017 21:55, Colin McCabe wrote: > > >>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote: > > >>>>>> Thanks for the explanation, Colin. A few more questions. > > >>>>>> > > >>>>>>> The session epoch is not complex. It's just a number which > > >>>>>>> increments > > >>>>>>> on each incremental fetch. The session epoch is also useful for > > >>>>>>> debugging-- it allows you to match up requests and responses when > > >>>>>>> looking at log files. > > >>>>>> Currently each request in Kafka has a correlation id to help match > > >>>>>> the > > >>>>>> requests and responses. Is epoch doing something differently? > > >>>>> Hi Becket, > > >>>>> > > >>>>> The correlation ID is used within a single TCP session, to uniquely > > >>>>> associate a request with a response. The correlation ID is not unique > > >>>>> (and has no meaning) outside the context of that single TCP session. > > >>>>> > > >>>>> Keep in mind, NetworkClient is in charge of TCP sessions, and > > >>>>> generally > > >>>>> tries to hide that information from the upper layers of the code. So > > >>>>> when you submit a request to NetworkClient, you don't know if that > > >>>>> request creates a TCP session, or reuses an existing one. > > >>>>>>> Unfortunately, this doesn't work. Imagine the client misses an > > >>>>>>> increment fetch response about a partition. And then the partition > > >>>>>>> is > > >>>>>>> never updated after that. The client has no way to know about the > > >>>>>>> partition, since it won't be included in any future incremental > > >>>>>>> fetch > > >>>>>>> responses. And there are no offsets to compare, since the > > >>>>>>> partition is > > >>>>>>> simply omitted from the response. > > >>>>>> I am curious about in which situation would the follower miss a > > >>>>>> response > > >>>>>> of a partition. If the entire FetchResponse is lost (e.g. timeout), > > >>>>>> the > > >>>>>> follower would disconnect and retry. That will result in sending a > > >>>>>> full > > >>>>>> FetchRequest. > > >>>>> Basically, you are proposing that we rely on TCP for reliable delivery > > >>>>> in a distributed system. That isn't a good idea for a bunch of > > >>>>> different reasons. First of all, TCP timeouts tend to be very long. > > >>>>> So > > >>>>> if the TCP session timing out is your error detection mechanism, you > > >>>>> have to wait minutes for messages to timeout. Of course, we add a > > >>>>> timeout on top of that after which we declare the connection bad and > > >>>>> manually close it. But just because the session is closed on one end > > >>>>> doesn't mean that the other end knows that it is closed. So the > > >>>>> leader > > >>>>> may have to wait quite a long time before TCP decides that yes, > > >>>>> connection X from the follower is dead and not coming back, even > > >>>>> though > > >>>>> gremlins ate the FIN packet which the follower attempted to translate. > > >>>>> If the cache state is tied to that TCP session, we have to keep that > > >>>>> cache around for a much longer time than we should. > > >>>> Hi, > > >>>> > > >>>> I see this from a different perspective. The cache expiry time > > >>>> has the same semantic as idle connection time in this scenario. > > >>>> It is the time range we expect the client to come back an reuse > > >>>> its broker side state. I would argue that on close we would get an > > >>>> extra shot at cleaning up the session state early. As opposed to > > >>>> always wait for that duration for expiry to happen. > > >>> Hi Jan, > > >>> > > >>> The idea here is that the incremental fetch cache expiry time can be > > >>> much shorter than the TCP session timeout. In general the TCP session > > >>> timeout is common to all TCP connections, and very long. To make these > > >>> numbers a little more concrete, the TCP session timeout is often > > >>> configured to be 2 hours on Linux. (See > > >>> https://www.cyberciti.biz/tips/linux-increasing-or-decreasing-tcp-sockets-timeouts.html > > >>> ) The timeout I was proposing for incremental fetch sessions was one or > > >>> two minutes at most. > > >> Currently this is taken care of by > > >> connections.max.idle.ms on the broker and defaults to something of few > > >> minutes. > > > It is 10 minutes by default, which is longer than what we want the > > > incremental fetch session timeout to be. There's no reason to couple > > > these two things. > > > > > >> Also something we could let the client change if we really wanted to. > > >> So there is no need to worry about coupling our implementation to some > > >> timeouts given by the OS, with TCP one always has full control over the > > >> worst > > >> times + one gets the extra shot cleaning up early when the close comes > > >> through. > > >> Which is the majority of the cases. > > > In the majority of cases, the TCP session will be re-established. In > > > that case, we have to send a full fetch request rather than an > > > incremental fetch request. > > I actually have a hard time believing this. Do you have any numbers of > > any existing production system? Is it the virtualisation layer cutting > > all the connections? > > We see this only on application crashes and restarts where the app needs > > todo the full anyways > > as it probably continues with stores offsets. > > Yes, TCP connections get dropped. It happens very often in production > clusters, actually. When I was working on Hadoop, one of the most > common questions I heard from newcomers was "why do I see so many > EOFException messages in the logs"? The other thing that happens a lot > is DNS outages or slowness. Public clouds seem to have even more > unstable networks than the on-premise clusters. I am not sure why that > is. > > > > > > >>>>> Secondly, from a software engineering perspective, it's not a good > > >>>>> idea > > >>>>> to try to tightly tie together TCP and our code. We would have to > > >>>>> rework how we interact with NetworkClient so that we are aware of > > >>>>> things > > >>>>> like TCP sessions closing or opening. We would have to be careful > > >>>>> preserve the ordering of incoming messages when doing things like > > >>>>> putting incoming requests on to a queue to be processed by multiple > > >>>>> threads. It's just a lot of complexity to add, and there's no upside. > > >>>> I see the point here. And I had a small chat with Dong Lin already > > >>>> making me aware of this. I tried out the approaches and propose the > > >>>> following: > > >>>> > > >>>> The client start and does a full fetch. It then does incremental > > >>>> fetches. > > >>>> The connection to the broker dies and is re-established by > > >>>> NetworkClient > > >>>> under the hood. > > >>>> The broker sees an incremental fetch without having state => returns > > >>>> error: > > >>>> Client sees the error, does a full fetch and goes back to incrementally > > >>>> fetching. > > >>>> > > >>>> having this 1 additional error round trip is essentially the same as > > >>>> when something > > >>>> with the sessions or epoch changed unexpectedly to the client (say > > >>>> expiry). > > >>>> > > >>>> So its nothing extra added but the conditions are easier to evaluate. > > >>>> Especially since we do everything with NetworkClient. Other > > >>>> implementers > > >>>> on the > > >>>> protocol are free to optimizes this and do not do the errornours > > >>>> roundtrip on the > > >>>> new connection. > > >>>> Its a great plus that the client can know when the error is gonna > > >>>> happen. instead of > > >>>> the server to always have to report back if something changes > > >>>> unexpectedly for the client > > >>> You are assuming that the leader and the follower agree that the TCP > > >>> session drops at the same time. When there are network problems, this > > >>> may not be true. The leader may still think the previous TCP session is > > >>> active. In that case, we have to keep the incremental fetch session > > >>> state around until we learn otherwise (which could be up to that 2 hour > > >>> timeout I mentioned). And if we get a new incoming incremental fetch > > >>> request, we can't assume that it replaces the previous one, because the > > >>> IDs will be different (the new one starts a new session). > > >> As mentioned, no reason to fear some time-outs out of our control > > >>>>> Imagine that I made an argument that client IDs are "complex" and > > >>>>> should > > >>>>> be removed from our APIs. After all, we can just look at the remote > > >>>>> IP > > >>>>> address and TCP port of each connection. Would you think that was a > > >>>>> good idea? The client ID is useful when looking at logs. For > > >>>>> example, > > >>>>> if a rebalance is having problems, you want to know what clients were > > >>>>> having a problem. So having the client ID field to guide you is > > >>>>> actually much less "complex" in practice than not having an ID. > > >>>> I still cant follow why the correlation idea will not help here. > > >>>> Correlating logs with it usually works great. Even with primitive tools > > >>>> like grep > > >>> The correlation ID does help somewhat, but certainly not as much as a > > >>> unique 64-bit ID. The correlation ID is not unique in the broker, just > > >>> unique to a single NetworkClient. Simiarly, the correlation ID is not > > >>> unique on the client side, if there are multiple Consumers, etc. > > >> Can always bump entropy in correlation IDs, never had a problem > > >> of finding to many duplicates. Would be a different KIP though. > > >>>>> Similarly, if metadata responses had epoch numbers (simple > > >>>>> incrementing > > >>>>> numbers), we would not have to debug problems like clients > > >>>>> accidentally > > >>>>> getting old metadata from servers that had been partitioned off from > > >>>>> the > > >>>>> network for a while. Clients would know the difference between old > > >>>>> and > > >>>>> new metadata. So putting epochs in to the metadata request is much > > >>>>> less > > >>>>> "complex" operationally, even though it's an extra field in the > > >>>>> request. > > >>>>> This has been discussed before on the mailing list. > > >>>>> > > >>>>> So I think the bottom line for me is that having the session ID and > > >>>>> session epoch, while it adds two extra fields, reduces operational > > >>>>> complexity and increases debuggability. It avoids tightly coupling us > > >>>>> to assumptions about reliable ordered delivery which tend to be > > >>>>> violated > > >>>>> in practice in multiple layers of the stack. Finally, it avoids the > > >>>>> necessity of refactoring NetworkClient. > > >>>> So there is stacks out there that violate TCP guarantees? And software > > >>>> still works? How can this be? Can you elaborate a little where this > > >>>> can be violated? I am not very familiar with virtualized environments > > >>>> but they can't really violate TCP contracts. > > >>> TCP's guarantees of reliable, in-order transmission certainly can be > > >>> violated. For example, I once had to debug a cluster where a certain > > >>> node had a network card which corrupted its transmissions occasionally. > > >>> With all the layers of checksums, you would think that this was not > > >>> possible, but it happened. We occasionally got corrupted data written > > >>> to disk on the other end because of it. Even more frustrating, the data > > >>> was not corrupted on disk on the sending node-- it was a bug in the > > >>> network card driver that was injecting the errors. > > >> true, but your broker might aswell read a corrupted 600GB as size from > > >> the network and die with OOM instantly. > > > If you read 600 GB as the size from the network, you will not "die with > > > OOM instantly." That would be a bug. Instead, you will notice that 600 > > > GB is greater than max.message.bytes, and close the connection. > > We only check max.message.bytes to late to guard against consumer > > stalling. > > we dont have a notion of max.networkpacket.size before we allocate the > > bytebuffer to read it into. > > "network packets" are not the same thing as "kafka RPCs." One Kafka RPC > could take up mutiple ethernet packets. > > Also, max.message.bytes has nothing to do with "consumer stalling" -- > you are probably thinking about some of the fetch request > configurations. max.message.bytes is used by the RPC system to figure > out whether to read the full incoming RP
Whoops, this is incorrect. I was thinking about "socket.request.max.bytes" rather than "max.message.bytes." Sorry about that. See Ismael's email as well. best, Colin > > best, > Colin > > > > > > > > >> Optimizing for still having functional > > >> software under this circumstances is not reasonable. > > >> You want to get rid of such a > > >> node ASAP and pray that zookeepers ticks get corrupted often enough > > >> that it finally drops out of the cluster. > > >> > > >> There is a good reason that these kinda things > > >> https://issues.apache.org/jira/browse/MESOS-4105 > > >> don't end up as kafka Jiras. In the end you can't run any software in > > >> these containers anymore. Application layer checksums are a neat thing to > > >> fail fast but trying to cope with this probably causes more bad than > > >> good. So I would argue that we shouldn't try this for the fetch > > >> requests. > > > One of the goals of Apache Kafka is to be "a streaming platform... > > > [that] lets you store streams of records in a fault-tolerant way." For > > > more information, see https://kafka.apache.org/intro . Fault-tolerance > > > is explicitly part of the goal of Kafka. Prayer should be optional, not > > > required, when running the software. > > Yes, we need to fail ASAP when we read corrupted packages. It seemed > > to me like you tried to make the case for pray and try to stay alive. > > Fault > > tolerance here means. I am a fishy box i am going to let a good box > > handle > > it and be silent until i get fixed up. > > > > > > Crashing because someone sent you a bad packet is not reasonable > > > behavior. It is a bug. Similarly, bringing down the whole cluster, > > > which could a hundred nodes, because someone had a bad network adapter > > > is not reasonable behavior. It is perhaps reasonable for the cluster to > > > perform worse when hardware is having problems. But that's a different > > > discussion. > > See above. > > > > > > best, > > > Colin > > > > > >> > > >>> However, my point was not about TCP's guarantees being violated. My > > >>> point is that TCP's guarantees are only one small building block to > > >>> build a robust distributed system. TCP basically just says that if you > > >>> get any bytes from the stream, you will get the ones that were sent by > > >>> the sender, in the order they were sent. TCP does not guarantee that > > >>> the bytes you send will get there. It does not guarantee that if you > > >>> close the connection, the other end will know about it in a timely > > >>> fashion. > > >> These are very powerful grantees and since we use TCP we should > > >> piggy pack everything that is reasonable on to it. IMO there is no > > >> need to reimplement correct sequencing again if you get that from > > >> your transport layer. It saves you the complexity, it makes > > >> you application behave way more naturally and your api easier to > > >> understand. > > >> > > >> There is literally nothing the Kernel wont let you decide > > >> especially not any timings. Only noticeable exception being TIME_WAIT > > >> of usually 240 seconds but that already has little todo with the broker > > >> itself and > > >> if we are running out of usable ports because of this then expiring > > >> fetch requests > > >> wont help much anyways. > > >> > > >> I hope I could strengthen the trust you have in userland TCP connection > > >> management. It is really powerful and can be exploited for maximum gains > > >> without much risk in my opinion. > > >> > > >> > > >> > > >>> It does not guarantee that the bytes will be received in a > > >>> certain timeframe, and certainly doesn't guarantee that if you send a > > >>> byte on connection X and then on connection Y, that the remote end will > > >>> read a byte on X before reading a byte on Y. > > >> Noone expects this from two independent paths of any kind. > > >> > > >>> best, > > >>> Colin > > >>> > > >>>> Hope this made my view clearer, especially the first part. > > >>>> > > >>>> Best Jan > > >>>> > > >>>> > > >>>>> best, > > >>>>> Colin > > >>>>> > > >>>>> > > >>>>>> If there is an error such as NotLeaderForPartition is > > >>>>>> returned for some partitions, the follower can always send a full > > >>>>>> FetchRequest. Is there a scenario that only some of the partitions > > >>>>>> in a > > >>>>>> FetchResponse is lost? > > >>>>>> > > >>>>>> Thanks, > > >>>>>> > > >>>>>> Jiangjie (Becket) Qin > > >>>>>> > > >>>>>> > > >>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<cmcc...@apache.org> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote: > > >>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<cmcc...@apache.org> > > >>>>>>> wrote: > > >>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote: > > >>>>>>>>>> Hey Colin, > > >>>>>>>>>> > > >>>>>>>>>> Thanks much for the update. I have a few questions below: > > >>>>>>>>>> > > >>>>>>>>>> 1. I am not very sure that we need Fetch Session Epoch. It seems > > >>>>>>>>>> that > > >>>>>>>>>> Fetch > > >>>>>>>>>> Session Epoch is only needed to help leader distinguish between > > >>>>>>>>>> "a > > >>>>>>> full > > >>>>>>>>>> fetch request" and "a full fetch request and request a new > > >>>>>>> incremental > > >>>>>>>>>> fetch session". Alternatively, follower can also indicate "a full > > >>>>>>> fetch > > >>>>>>>>>> request and request a new incremental fetch session" by setting > > >>>>>>>>>> Fetch > > >>>>>>>>>> Session ID to -1 without using Fetch Session Epoch. Does this > > >>>>>>>>>> make > > >>>>>>> sense? > > >>>>>>>>> Hi Dong, > > >>>>>>>>> > > >>>>>>>>> The fetch session epoch is very important for ensuring > > >>>>>>>>> correctness. It > > >>>>>>>>> prevents corrupted or incomplete fetch data due to network > > >>>>>>>>> reordering > > >>>>>>> or > > >>>>>>>>> loss. > > >>>>>>>>> > > >>>>>>>>> For example, consider a scenario where the follower sends a fetch > > >>>>>>>>> request to the leader. The leader responds, but the response is > > >>>>>>>>> lost > > >>>>>>>>> because of network problems which affected the TCP session. In > > >>>>>>>>> that > > >>>>>>>>> case, the follower must establish a new TCP session and re-send > > >>>>>>>>> the > > >>>>>>>>> incremental fetch request. But the leader does not know that the > > >>>>>>>>> follower didn't receive the previous incremental fetch response. > > >>>>>>>>> It is > > >>>>>>>>> only the incremental fetch epoch which lets the leader know that > > >>>>>>>>> it > > >>>>>>>>> needs to resend that data, and not data which comes afterwards. > > >>>>>>>>> > > >>>>>>>>> You could construct similar scenarios with message reordering, > > >>>>>>>>> duplication, etc. Basically, this is a stateful protocol on an > > >>>>>>>>> unreliable network, and you need to know whether the follower got > > >>>>>>>>> the > > >>>>>>>>> previous data you sent before you move on. And you need to handle > > >>>>>>>>> issues like duplicated or delayed requests. These issues do not > > >>>>>>>>> affect > > >>>>>>>>> the full fetch request, because it is not stateful-- any full > > >>>>>>>>> fetch > > >>>>>>>>> request can be understood and properly responded to in isolation. > > >>>>>>>>> > > >>>>>>>> Thanks for the explanation. This makes sense. On the other hand I > > >>>>>>>> would > > >>>>>>>> be interested in learning more about whether Becket's solution can > > >>>>>>>> help > > >>>>>>>> simplify the protocol by not having the echo field and whether > > >>>>>>>> that is > > >>>>>>>> worth doing. > > >>>>>>> Hi Dong, > > >>>>>>> > > >>>>>>> I commented about this in the other thread. A solution which > > >>>>>>> doesn't > > >>>>>>> maintain session information doesn't work here. > > >>>>>>> > > >>>>>>>>>> 2. It is said that Incremental FetchRequest will include > > >>>>>>>>>> partitions > > >>>>>>> whose > > >>>>>>>>>> fetch offset or maximum number of fetch bytes has been changed. > > >>>>>>>>>> If > > >>>>>>>>>> follower's logStartOffet of a partition has changed, should this > > >>>>>>>>>> partition also be included in the next FetchRequest to the > > >>>>>>>>>> leader? > > >>>>>>>>> Otherwise, it > > >>>>>>>>>> may affect the handling of DeleteRecordsRequest because leader > > >>>>>>>>>> may > > >>>>>>> not > > >>>>>>>>> know > > >>>>>>>>>> the corresponding data has been deleted on the follower. > > >>>>>>>>> Yeah, the follower should include the partition if the > > >>>>>>>>> logStartOffset > > >>>>>>>>> has changed. That should be spelled out on the KIP. Fixed. > > >>>>>>>>> > > >>>>>>>>>> 3. In the section "Per-Partition Data", a partition is not > > >>>>>>>>>> considered > > >>>>>>>>>> dirty if its log start offset has changed. Later in the section > > >>>>>>>>> "FetchRequest > > >>>>>>>>>> Changes", it is said that incremental fetch responses will > > >>>>>>>>>> include a > > >>>>>>>>>> partition if its logStartOffset has changed. It seems > > >>>>>>>>>> inconsistent. > > >>>>>>> Can > > >>>>>>>>>> you update the KIP to clarify it? > > >>>>>>>>>> > > >>>>>>>>> In the "Per-Partition Data" section, it does say that > > >>>>>>>>> logStartOffset > > >>>>>>>>> changes make a partition dirty, though, right? The first bullet > > >>>>>>>>> point > > >>>>>>>>> is: > > >>>>>>>>> > > >>>>>>>>>> * The LogCleaner deletes messages, and this changes the log start > > >>>>>>> offset > > >>>>>>>>> of the partition on the leader., or > > >>>>>>>>> > > >>>>>>>> Ah I see. I think I didn't notice this because statement assumes > > >>>>>>>> that the > > >>>>>>>> LogStartOffset in the leader only changes due to LogCleaner. In > > >>>>>>>> fact the > > >>>>>>>> LogStartOffset can change on the leader due to either log > > >>>>>>>> retention and > > >>>>>>>> DeleteRecordsRequest. I haven't verified whether LogCleaner can > > >>>>>>>> change > > >>>>>>>> LogStartOffset though. It may be a bit better to just say that a > > >>>>>>>> partition is considered dirty if LogStartOffset changes. > > >>>>>>> I agree. It should be straightforward to just resend the partition > > >>>>>>> if > > >>>>>>> logStartOffset changes. > > >>>>>>> > > >>>>>>>>>> 4. In "Fetch Session Caching" section, it is said that each > > >>>>>>>>>> broker > > >>>>>>> has a > > >>>>>>>>>> limited number of slots. How is this number determined? Does this > > >>>>>>> require > > >>>>>>>>>> a new broker config for this number? > > >>>>>>>>> Good point. I added two broker configuration parameters to > > >>>>>>>>> control > > >>>>>>> this > > >>>>>>>>> number. > > >>>>>>>>> > > >>>>>>>> I am curious to see whether we can avoid some of these new > > >>>>>>>> configs. For > > >>>>>>>> example, incremental.fetch.session.cache.slots.per.broker is > > >>>>>>>> probably > > >>>>>>> not > > >>>>>>>> necessary because if a leader knows that a FetchRequest comes from > > >>>>>>>> a > > >>>>>>>> follower, we probably want the leader to always cache the > > >>>>>>>> information > > >>>>>>>> from that follower. Does this make sense? > > >>>>>>> Yeah, maybe we can avoid having > > >>>>>>> incremental.fetch.session.cache.slots.per.broker. > > >>>>>>> > > >>>>>>>> Maybe we can discuss the config later after there is agreement on > > >>>>>>>> how the > > >>>>>>>> protocol would look like. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>>> What is the error code if broker does > > >>>>>>>>>> not have new log for the incoming FetchRequest? > > >>>>>>>>> Hmm, is there a typo in this question? Maybe you meant to ask > > >>>>>>>>> what > > >>>>>>>>> happens if there is no new cache slot for the incoming > > >>>>>>>>> FetchRequest? > > >>>>>>>>> That's not an error-- the incremental fetch session ID just gets > > >>>>>>>>> set to > > >>>>>>>>> 0, indicating no incremental fetch session was created. > > >>>>>>>>> > > >>>>>>>> Yeah there is a typo. You have answered my question. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>>> 5. Can you clarify what happens if follower adds a partition to > > >>>>>>>>>> the > > >>>>>>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does > > >>>>>>>>>> leader > > >>>>>>>>>> needs to generate a new session for this ReplicaFetcherThread or > > >>>>>>> does it > > >>>>>>>>> re-use > > >>>>>>>>>> the existing session? If it uses a new session, is the old > > >>>>>>>>>> session > > >>>>>>>>>> actively deleted from the slot? > > >>>>>>>>> The basic idea is that you can't make changes, except by sending > > >>>>>>>>> a full > > >>>>>>>>> fetch request. However, perhaps we can allow the client to > > >>>>>>>>> re-use its > > >>>>>>>>> existing session ID. If the client sets sessionId = id, epoch = > > >>>>>>>>> 0, it > > >>>>>>>>> could re-initialize the session. > > >>>>>>>>> > > >>>>>>>> Yeah I agree with the basic idea. We probably want to understand > > >>>>>>>> more > > >>>>>>>> detail about how this works later. > > >>>>>>> Sounds good. I updated the KIP with this information. A > > >>>>>>> re-initialization should be exactly the same as an initialization, > > >>>>>>> except that it reuses an existing ID. > > >>>>>>> > > >>>>>>> best, > > >>>>>>> Colin > > >>>>>>> > > >>>>>>> > > >>>>>>>>>> BTW, I think it may be useful if the KIP can include the example > > >>>>>>> workflow > > >>>>>>>>>> of how this feature will be used in case of partition change and > > >>>>>>>>>> so > > >>>>>>> on. > > >>>>>>>>> Yeah, that might help. > > >>>>>>>>> > > >>>>>>>>> best, > > >>>>>>>>> Colin > > >>>>>>>>> > > >>>>>>>>>> Thanks, > > >>>>>>>>>> Dong > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin > > >>>>>>>>>> McCabe<cmcc...@apache.org> > > >>>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> I updated the KIP with the ideas we've been discussing. > > >>>>>>>>>>> > > >>>>>>>>>>> best, > > >>>>>>>>>>> Colin > > >>>>>>>>>>> > > >>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote: > > >>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote: > > >>>>>>>>>>>>> Hi Colin, thank you for this KIP, it can become a really > > >>>>>>> useful > > >>>>>>>>> thing. > > >>>>>>>>>>>>> I just scanned through the discussion so far and wanted to > > >>>>>>> start a > > >>>>>>>>>>>>> thread to make as decision about keeping the > > >>>>>>>>>>>>> cache with the Connection / Session or having some sort of > > >>>>>>>>>>>>> UUID > > >>>>>>>>> indN > > >>>>>>>>>>> exed > > >>>>>>>>>>>>> global Map. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Sorry if that has been settled already and I missed it. In > > >>>>>>>>>>>>> this > > >>>>>>>>> case > > >>>>>>>>>>>>> could anyone point me to the discussion? > > >>>>>>>>>>>> Hi Jan, > > >>>>>>>>>>>> > > >>>>>>>>>>>> I don't think anyone has discussed the idea of tying the cache > > >>>>>>> to an > > >>>>>>>>>>>> individual TCP session yet. I agree that since the cache is > > >>>>>>>>> intended to > > >>>>>>>>>>>> be used only by a single follower or client, it's an > > >>>>>>>>>>>> interesting > > >>>>>>>>> thing > > >>>>>>>>>>>> to think about. > > >>>>>>>>>>>> > > >>>>>>>>>>>> I guess the obvious disadvantage is that whenever your TCP > > >>>>>>> session > > >>>>>>>>>>>> drops, you have to make a full fetch request rather than an > > >>>>>>>>> incremental > > >>>>>>>>>>>> one. It's not clear to me how often this happens in practice > > >>>>>>>>>>>> -- > > >>>>>>> it > > >>>>>>>>>>>> probably depends a lot on the quality of the network. From a > > >>>>>>> code > > >>>>>>>>>>>> perspective, it might also be a bit difficult to access data > > >>>>>>>>> associated > > >>>>>>>>>>>> with the Session from classes like KafkaApis (although we could > > >>>>>>>>> refactor > > >>>>>>>>>>>> it to make this easier). > > >>>>>>>>>>>> > > >>>>>>>>>>>> It's also clear that even if we tie the cache to the session, > > >>>>>>>>>>>> we > > >>>>>>>>> still > > >>>>>>>>>>>> have to have limits on the number of caches we're willing to > > >>>>>>> create. > > >>>>>>>>>>>> And probably we should reserve some cache slots for each > > >>>>>>> follower, so > > >>>>>>>>>>>> that clients don't take all of them. > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Id rather see a protocol in which the client is hinting the > > >>>>>>> broker > > >>>>>>>>>>> that, > > >>>>>>>>>>>>> he is going to use the feature instead of a client > > >>>>>>>>>>>>> realizing that the broker just offered the feature (regardless > > >>>>>>> of > > >>>>>>>>>>>>> protocol version which should only indicate that the feature > > >>>>>>>>>>>>> would be usable). > > >>>>>>>>>>>> Hmm. I'm not sure what you mean by "hinting." I do think that > > >>>>>>> the > > >>>>>>>>>>>> server should have the option of not accepting incremental > > >>>>>>> requests > > >>>>>>>>> from > > >>>>>>>>>>>> specific clients, in order to save memory space. > > >>>>>>>>>>>> > > >>>>>>>>>>>>> This seems to work better with a per > > >>>>>>>>>>>>> connection/session attached Metadata than with a Map and could > > >>>>>>>>> allow > > >>>>>>>>>>> for > > >>>>>>>>>>>>> easier client implementations. > > >>>>>>>>>>>>> It would also make Client-side code easier as there wouldn't > > >>>>>>> be any > > >>>>>>>>>>>>> Cache-miss error Messages to handle. > > >>>>>>>>>>>> It is nice not to have to handle cache-miss responses, I agree. > > >>>>>>>>>>>> However, TCP sessions aren't exposed to most of our client-side > > >>>>>>> code. > > >>>>>>>>>>>> For example, when the Producer creates a message and hands it > > >>>>>>> off to > > >>>>>>>>> the > > >>>>>>>>>>>> NetworkClient, the NC will transparently re-connect and > > >>>>>>>>>>>> re-send a > > >>>>>>>>>>>> message if the first send failed. The higher-level code will > > >>>>>>> not be > > >>>>>>>>>>>> informed about whether the TCP session was re-established, > > >>>>>>> whether an > > >>>>>>>>>>>> existing TCP session was used, and so on. So overall I would > > >>>>>>> still > > >>>>>>>>> lean > > >>>>>>>>>>>> towards not coupling this to the TCP session... > > >>>>>>>>>>>> > > >>>>>>>>>>>> best, > > >>>>>>>>>>>> Colin > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Thank you again for the KIP. And again, if this was > > >>>>>>>>>>>>> clarified > > >>>>>>>>> already > > >>>>>>>>>>>>> please drop me a hint where I could read about it. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Best Jan > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote: > > >>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I created a KIP to improve the scalability and latency of > > >>>>>>>>>>> FetchRequest: > > >>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > > >>>>>>>>>>> Partition+Scalability > > >>>>>>>>>>>>>> Please take a look. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> cheers, > > >>>>>>>>>>>>>> Colin > >