Thanks for the detailed explanation.

-Matthias

On 10/23/18 12:38 PM, Zahari Dichev wrote:
> Hi there Matthias, I looked through the code of Kafka Streams. Quite
> impressive work ! If I have to put the logic of buffering within the
> context of what we are doing in Akka though, I might end up with the
> following situation.
> 
> 1. Poll is called with two partition being active *TP1, TP2*
> 2. We get some data for both, both of them also prefetch some data.
> 3. So now we have some data that we have obtained and some data that sits
> with the buffer of the fetcher, waiting to be obtained.
> 4. We put the data that we have obtained from the poll into the respective
> buffers of the partitions.
> 5. Since both of our buffers are "full", we call pause on both *TP1* and
> *TP2*.
> 6. A little time has passed and the client of *TP1* has processed all its
> records from the buffer, while the one of *TP2* has processed none
> 7. Buffer of *TP1* gets empty, we call resume on *TP1*
> 8. We call poll again with *TP1* resumed and *TP2* paused.
> 9. We get some records for TP1 and we throw away all the records that were
> prefetched for *TP2* in step 2
> 
> This can go on and on and due to the dynamic nature of the speed of
> processing records and the theoretically unlimited number of topic
> partitions, I find it possible that this scenario can happen more than once
> over the lifetime of a client. And instead of trying to calculate the
> probability of this happening and attempt to minimise it, I would prefer to
> have one of two options:
> 
> 1. Having control to allow me to enable the returning of already prefetched
> data, and simply store it in a buffer of my own until I have enough
> capacity to deal with it
> 
> OR
> 
> 2. Keep the data in the fetcher and not throw it away but use it on the
> next poll (not sure how viable that is as I have not looked at the details
> of it all).
> 
> The first option is what I suggested initially and the second option is the
> one that will allow us to skip the introduction of a configuration
> parameter as Colin suggested. These are the things I can suggest at the
> moment. As mentioned, I am willing to carry out the work. There is also an
> official discussion thread, but I guess we have deviated from that, so I
> can just put that current on in JIRA instead if that is OK ?
> 
> Matthias, regarding how the fetcher works. From what I have looked at,
> whenever the consumer polls and returns some data, we immediately issue
> another fetch request that delivered us records that are returned on the
> next poll. All these fetched records, that have not made it to the caller
> of poll but have been fetched are thrown away in case at the time of the
> nest poll() the partition is in paused state. This is what is causing the
> inefficiency.
> 
> Any more comments are welcome.
> 
> On Mon, Oct 22, 2018 at 6:00 AM Ismael Juma <isma...@gmail.com> wrote:
> 
>> Hi,
>>
>> I think a KIP to discuss a concrete proposal makes sense. One suggestion is
>> to explore the possibility of fixing the issue without a new config. Would
>> that break existing users? Generally, we should strive for avoiding configs
>> if at all possible.
>>
>> Ismael
>>
>> On 16 Oct 2018 12:30 am, "Zahari Dichev" <zaharidic...@gmail.com> wrote:
>>
>> Hi there Kafka developers,
>>
>> I am currently trying to find a solution to an issue that has been
>> manifesting itself in the Akka streams implementation of the Kafka
>> connector. When it comes to consuming messages, the implementation relies
>> heavily on the fact that we can pause and resume partitions. In some
>> situations when a single consumer instance is shared among several streams,
>> we might end up with frequently pausing and unpausing a set of topic
>> partitions, which is the main facility that allows us to implement back
>> pressure. This however has certain disadvantages, especially when there are
>> two consumers that differ in terms of processing speed.
>>
>> To articulate the issue more clearly, imagine that a consumer maintains
>> assignments for two topic partitions *TP1* and *TP2*. This consumer is
>> shared by two streams - S1 and S2. So effectively when we have demand from
>> only one of the streams - *S1*, we will pause one of the topic partitions
>> *TP2* and call *poll()* on the consumer to only retrieve the records for
>> the demanded topic partition - *TP1*. The result of that is all the records
>> that have been prefetched for *TP2* are now thrown away by the fetcher
>> ("*Not
>> returning fetched records for assigned partition TP2 since it is no longer
>> fetchable"*). If we extrapolate that to multiple streams sharing the same
>> consumer, we might quickly end up in a situation where we throw prefetched
>> data quite often. This does not seem like the most efficient approach and
>> in fact produces quite a lot of overlapping fetch requests as illustrated
>> in the following issue:
>>
>> https://github.com/akka/alpakka-kafka/issues/549
>>
>> I am writing this email to get some initial opinion on a KIP I was thinking
>> about. What if we give the clients of the Consumer API a bit more control
>> of what to do with this prefetched data. Two options I am wondering about:
>>
>> 1. Introduce a configuration setting, such as*
>> "return-prefetched-data-for-paused-topic-partitions = false"* (have to
>> think of a better name), which when set to true will return what is
>> prefetched instead of throwing it away on calling *poll()*. Since this is
>> amount of data that is bounded by the maximum size of the prefetch, we can
>> control what is the most amount of records returned. The client of the
>> consumer API can then be responsible for keeping that data around and use
>> it when appropriate (i.e. when demand is present)
>>
>> 2. Introduce a facility to pass in a buffer into which the prefetched
>> records are drained when poll is called and paused partitions have some
>> prefetched records.
>>
>> Any opinions on the matter are welcome. Thanks a lot !
>>
>>
>> Zahari Dichev
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to