Thanks for the detailed explanation. -Matthias
On 10/23/18 12:38 PM, Zahari Dichev wrote: > Hi there Matthias, I looked through the code of Kafka Streams. Quite > impressive work ! If I have to put the logic of buffering within the > context of what we are doing in Akka though, I might end up with the > following situation. > > 1. Poll is called with two partition being active *TP1, TP2* > 2. We get some data for both, both of them also prefetch some data. > 3. So now we have some data that we have obtained and some data that sits > with the buffer of the fetcher, waiting to be obtained. > 4. We put the data that we have obtained from the poll into the respective > buffers of the partitions. > 5. Since both of our buffers are "full", we call pause on both *TP1* and > *TP2*. > 6. A little time has passed and the client of *TP1* has processed all its > records from the buffer, while the one of *TP2* has processed none > 7. Buffer of *TP1* gets empty, we call resume on *TP1* > 8. We call poll again with *TP1* resumed and *TP2* paused. > 9. We get some records for TP1 and we throw away all the records that were > prefetched for *TP2* in step 2 > > This can go on and on and due to the dynamic nature of the speed of > processing records and the theoretically unlimited number of topic > partitions, I find it possible that this scenario can happen more than once > over the lifetime of a client. And instead of trying to calculate the > probability of this happening and attempt to minimise it, I would prefer to > have one of two options: > > 1. Having control to allow me to enable the returning of already prefetched > data, and simply store it in a buffer of my own until I have enough > capacity to deal with it > > OR > > 2. Keep the data in the fetcher and not throw it away but use it on the > next poll (not sure how viable that is as I have not looked at the details > of it all). > > The first option is what I suggested initially and the second option is the > one that will allow us to skip the introduction of a configuration > parameter as Colin suggested. These are the things I can suggest at the > moment. As mentioned, I am willing to carry out the work. There is also an > official discussion thread, but I guess we have deviated from that, so I > can just put that current on in JIRA instead if that is OK ? > > Matthias, regarding how the fetcher works. From what I have looked at, > whenever the consumer polls and returns some data, we immediately issue > another fetch request that delivered us records that are returned on the > next poll. All these fetched records, that have not made it to the caller > of poll but have been fetched are thrown away in case at the time of the > nest poll() the partition is in paused state. This is what is causing the > inefficiency. > > Any more comments are welcome. > > On Mon, Oct 22, 2018 at 6:00 AM Ismael Juma <isma...@gmail.com> wrote: > >> Hi, >> >> I think a KIP to discuss a concrete proposal makes sense. One suggestion is >> to explore the possibility of fixing the issue without a new config. Would >> that break existing users? Generally, we should strive for avoiding configs >> if at all possible. >> >> Ismael >> >> On 16 Oct 2018 12:30 am, "Zahari Dichev" <zaharidic...@gmail.com> wrote: >> >> Hi there Kafka developers, >> >> I am currently trying to find a solution to an issue that has been >> manifesting itself in the Akka streams implementation of the Kafka >> connector. When it comes to consuming messages, the implementation relies >> heavily on the fact that we can pause and resume partitions. In some >> situations when a single consumer instance is shared among several streams, >> we might end up with frequently pausing and unpausing a set of topic >> partitions, which is the main facility that allows us to implement back >> pressure. This however has certain disadvantages, especially when there are >> two consumers that differ in terms of processing speed. >> >> To articulate the issue more clearly, imagine that a consumer maintains >> assignments for two topic partitions *TP1* and *TP2*. This consumer is >> shared by two streams - S1 and S2. So effectively when we have demand from >> only one of the streams - *S1*, we will pause one of the topic partitions >> *TP2* and call *poll()* on the consumer to only retrieve the records for >> the demanded topic partition - *TP1*. The result of that is all the records >> that have been prefetched for *TP2* are now thrown away by the fetcher >> ("*Not >> returning fetched records for assigned partition TP2 since it is no longer >> fetchable"*). If we extrapolate that to multiple streams sharing the same >> consumer, we might quickly end up in a situation where we throw prefetched >> data quite often. This does not seem like the most efficient approach and >> in fact produces quite a lot of overlapping fetch requests as illustrated >> in the following issue: >> >> https://github.com/akka/alpakka-kafka/issues/549 >> >> I am writing this email to get some initial opinion on a KIP I was thinking >> about. What if we give the clients of the Consumer API a bit more control >> of what to do with this prefetched data. Two options I am wondering about: >> >> 1. Introduce a configuration setting, such as* >> "return-prefetched-data-for-paused-topic-partitions = false"* (have to >> think of a better name), which when set to true will return what is >> prefetched instead of throwing it away on calling *poll()*. Since this is >> amount of data that is bounded by the maximum size of the prefetch, we can >> control what is the most amount of records returned. The client of the >> consumer API can then be responsible for keeping that data around and use >> it when appropriate (i.e. when demand is present) >> >> 2. Introduce a facility to pass in a buffer into which the prefetched >> records are drained when poll is called and paused partitions have some >> prefetched records. >> >> Any opinions on the matter are welcome. Thanks a lot ! >> >> >> Zahari Dichev >> >
signature.asc
Description: OpenPGP digital signature