Just my 2 cents. I am not 100% sure if we would need to change the consumer for this. While I still think, that KIP-349 might be valuable, it seems to be complementary/orthogonal to the issue discussed here.
For Kafka Streams, we have a related scenario and what Kafka Streams does is, to add its own buffer on top of the consumer. Thus, for each `poll()` all data is put into this buffer, and now Streams can decide which record to process first. For buffers that have data, we can call `pause()` without loosing fetched data (not 100% percent guarantee, depending on fetch size and max.poll.record etc, but with high probability) and if a buffer gets empty we `resume()` partitions. As Akka Streams builds on top of the consumer it could implement a similar pattern. Of course, one cannot use `auto.commit` on the consumer, but commits need to be manged manually, (ie, only data that was take out of the buffer and actually was processed can be committed). For the `MessageChooser` idea, I also still think it might be useful, but it's unclear to me if this should be a consumer feature or build on top of the consumer (maybe it could be a Streams feature, as Streams is build on top of the consumer). Thoughts? -Matthias On 10/18/18 9:27 AM, Jan Filipiak wrote: > The idea for you would be that Messagechooser could hang on to the > prefetched messages. > > ccing cmcc...@apache.org > > @Collin > just for you to see that MessageChooser is a powerfull abstraction. > > :) > > Best jan > > On 18.10.2018 13:59, Zahari Dichev wrote: >> Jan, >> >> Quite insightful indeed. I think your propositions are valid. >> >> Ryanne, >> >> I understand that consumers are using a pull model... And yes, indeed if a >> consumer is not ready for more records it surely should not call poll. >> Except that it needs to do so periodically in order to indicate that its >> live. Forget about the "backpressure", I guess I was wrong with phrasing >> this so lets not get caught up on it. >> >> You say pause/resume can be used to prioritise certain topics/partitions >> over others. And indeed this is the case. So instead of thinking about it >> in terms of backpressure, lets put it in a different way. The Akka streams >> connector would like to prioritise certain topics over others, using once >> consumer instance. On top of that, add the detail that the priorities >> change quite frequently (which translates to calling pause/resume >> frequently). So all that being said, what would be a proper way to handle >> the situation without throwing the pre-fetched records away when calling >> poll on a consumer that happens to have a topic that was recently paused >> (and that might be un-paused soon )? Am I the only one who considers that >> an actual problem with the use os pause/resume ? Not sure how to explain >> the situation in a better way.. >> >> Zahari >> >> >> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com> >> wrote: >> >>> Thanks a lot Jan, >>> >>> I will read it. >>> >>> Zahari >>> >>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <jan.filip...@trivago.com> >>> wrote: >>> >>>> especially my suggestions ;) >>>> >>>> On 18.10.2018 08:30, Jan Filipiak wrote: >>>>> Hi Zahari, >>>>> >>>>> would you be willing to scan through the KIP-349 discussion a little? >>>>> I think it has suggestions that could be interesting for you >>>>> >>>>> Best Jan >>>>> >>>>> On 16.10.2018 09:29, Zahari Dichev wrote: >>>>>> Hi there Kafka developers, >>>>>> >>>>>> I am currently trying to find a solution to an issue that has been >>>>>> manifesting itself in the Akka streams implementation of the Kafka >>>>>> connector. When it comes to consuming messages, the implementation >>>> relies >>>>>> heavily on the fact that we can pause and resume partitions. In some >>>>>> situations when a single consumer instance is shared among several >>>>>> streams, >>>>>> we might end up with frequently pausing and unpausing a set of topic >>>>>> partitions, which is the main facility that allows us to implement back >>>>>> pressure. This however has certain disadvantages, especially when >>>>>> there are >>>>>> two consumers that differ in terms of processing speed. >>>>>> >>>>>> To articulate the issue more clearly, imagine that a consumer maintains >>>>>> assignments for two topic partitions *TP1* and *TP2*. This consumer is >>>>>> shared by two streams - S1 and S2. So effectively when we have demand >>>>>> from >>>>>> only one of the streams - *S1*, we will pause one of the topic >>>> partitions >>>>>> *TP2* and call *poll()* on the consumer to only retrieve the records >>>> for >>>>>> the demanded topic partition - *TP1*. The result of that is all the >>>>>> records >>>>>> that have been prefetched for *TP2* are now thrown away by the fetcher >>>>>> ("*Not >>>>>> returning fetched records for assigned partition TP2 since it is no >>>>>> longer >>>>>> fetchable"*). If we extrapolate that to multiple streams sharing the >>>> same >>>>>> consumer, we might quickly end up in a situation where we throw >>>>>> prefetched >>>>>> data quite often. This does not seem like the most efficient approach >>>> and >>>>>> in fact produces quite a lot of overlapping fetch requests as >>>> illustrated >>>>>> in the following issue: >>>>>> >>>>>> https://github.com/akka/alpakka-kafka/issues/549 >>>>>> >>>>>> I am writing this email to get some initial opinion on a KIP I was >>>>>> thinking >>>>>> about. What if we give the clients of the Consumer API a bit more >>>> control >>>>>> of what to do with this prefetched data. Two options I am wondering >>>>>> about: >>>>>> >>>>>> 1. Introduce a configuration setting, such as* >>>>>> "return-prefetched-data-for-paused-topic-partitions = false"* (have to >>>>>> think of a better name), which when set to true will return what is >>>>>> prefetched instead of throwing it away on calling *poll()*. Since this >>>> is >>>>>> amount of data that is bounded by the maximum size of the prefetch, we >>>>>> can >>>>>> control what is the most amount of records returned. The client of the >>>>>> consumer API can then be responsible for keeping that data around and >>>> use >>>>>> it when appropriate (i.e. when demand is present) >>>>>> >>>>>> 2. Introduce a facility to pass in a buffer into which the prefetched >>>>>> records are drained when poll is called and paused partitions have some >>>>>> prefetched records. >>>>>> >>>>>> Any opinions on the matter are welcome. Thanks a lot ! >>>>>> >>>>>> Zahari Dichev >>>>>> >>>> >>> >>
signature.asc
Description: OpenPGP digital signature