> You have one consumer that is quite >> slow so lets say it call poll every 5 seconds, while you need to call poll >> every 1 second to issue a heartbeat (these are made up numbers of course).
Heartbeats are send by a backgroud thread since 0.10.0.0. And there is a second configuration `max.poll.interval.ms` that you can increase for a slow consumer. Kafka Streams sets it to `Integer.MAX_VALUE` for example. For the remainder, I agree, that using a buffer on top might not a perfect solution. That is, what I meant by: > (not 100% percent guarantee, > depending on fetch size and max.poll.record etc, but with high > probability) However, I believe, that if you configure the consumer accordingly, you can to drain the fetch buffer if you block on `poll()` forever. I have to admit, than I am not familiar with the details of pipelining fetch requests thought. The general idea is, to make sure to drain the internal buffer of the consumer, before you call `pause()`. Curious to to learn why this would not work? How does the pipelining of fetch requests works in detail? -Matthias On 10/19/18 1:36 PM, Zahari Dichev wrote: > Hi there Matthias, > > Very useful thoughts indeed. I have considered the exact same approach but > what worries me a bit is that I do not think that will certainly solve the > issue. Imagine the following situation. You have one consumer that is quite > slow so lets say it call poll every 5 seconds, while you need to call poll > every 1 second to issue a heartbeat (these are made up numbers of course). > > So our consumer calls poll at T0 grabs some data and puts it in a buffer > and calls pause on the topic partition. The fetcher tries to pipeline and > issues a fetch request and at some point the data arrives. At that point we > have some data in the buffer and we can do whatever we want with it, but > there is also some data living within the consumer/fetcher. Approximately > one second later we call poll again because we need to. We are not getting > any data because our partition is paused and this is good because if we got > data we would not know what to do with it as our client is still busy > crunching the data from the first poll. So far so good. What happens though > is that the pre-fetched data gets thrown away upon calling poll as its "no > longer fetchable...". Maybe I am not fully understanding your suggested > approach, but I dont think it would solve this problem. > > Zahari > > On Fri, Oct 19, 2018 at 8:10 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Just my 2 cents. >> >> I am not 100% sure if we would need to change the consumer for this. >> While I still think, that KIP-349 might be valuable, it seems to be >> complementary/orthogonal to the issue discussed here. >> >> For Kafka Streams, we have a related scenario and what Kafka Streams >> does is, to add its own buffer on top of the consumer. Thus, for each >> `poll()` all data is put into this buffer, and now Streams can decide >> which record to process first. For buffers that have data, we can call >> `pause()` without loosing fetched data (not 100% percent guarantee, >> depending on fetch size and max.poll.record etc, but with high >> probability) and if a buffer gets empty we `resume()` partitions. >> >> As Akka Streams builds on top of the consumer it could implement a >> similar pattern. Of course, one cannot use `auto.commit` on the >> consumer, but commits need to be manged manually, (ie, only data that >> was take out of the buffer and actually was processed can be committed). >> >> For the `MessageChooser` idea, I also still think it might be useful, >> but it's unclear to me if this should be a consumer feature or build on >> top of the consumer (maybe it could be a Streams feature, as Streams is >> build on top of the consumer). Thoughts? >> >> >> -Matthias >> >> On 10/18/18 9:27 AM, Jan Filipiak wrote: >>> The idea for you would be that Messagechooser could hang on to the >>> prefetched messages. >>> >>> ccing cmcc...@apache.org >>> >>> @Collin >>> just for you to see that MessageChooser is a powerfull abstraction. >>> >>> :) >>> >>> Best jan >>> >>> On 18.10.2018 13:59, Zahari Dichev wrote: >>>> Jan, >>>> >>>> Quite insightful indeed. I think your propositions are valid. >>>> >>>> Ryanne, >>>> >>>> I understand that consumers are using a pull model... And yes, indeed >> if a >>>> consumer is not ready for more records it surely should not call poll. >>>> Except that it needs to do so periodically in order to indicate that its >>>> live. Forget about the "backpressure", I guess I was wrong with phrasing >>>> this so lets not get caught up on it. >>>> >>>> You say pause/resume can be used to prioritise certain topics/partitions >>>> over others. And indeed this is the case. So instead of thinking about >> it >>>> in terms of backpressure, lets put it in a different way. The Akka >> streams >>>> connector would like to prioritise certain topics over others, using >> once >>>> consumer instance. On top of that, add the detail that the priorities >>>> change quite frequently (which translates to calling pause/resume >>>> frequently). So all that being said, what would be a proper way to >> handle >>>> the situation without throwing the pre-fetched records away when calling >>>> poll on a consumer that happens to have a topic that was recently paused >>>> (and that might be un-paused soon )? Am I the only one who considers >> that >>>> an actual problem with the use os pause/resume ? Not sure how to explain >>>> the situation in a better way.. >>>> >>>> Zahari >>>> >>>> >>>> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com> >>>> wrote: >>>> >>>>> Thanks a lot Jan, >>>>> >>>>> I will read it. >>>>> >>>>> Zahari >>>>> >>>>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <jan.filip...@trivago.com >>> >>>>> wrote: >>>>> >>>>>> especially my suggestions ;) >>>>>> >>>>>> On 18.10.2018 08:30, Jan Filipiak wrote: >>>>>>> Hi Zahari, >>>>>>> >>>>>>> would you be willing to scan through the KIP-349 discussion a little? >>>>>>> I think it has suggestions that could be interesting for you >>>>>>> >>>>>>> Best Jan >>>>>>> >>>>>>> On 16.10.2018 09:29, Zahari Dichev wrote: >>>>>>>> Hi there Kafka developers, >>>>>>>> >>>>>>>> I am currently trying to find a solution to an issue that has been >>>>>>>> manifesting itself in the Akka streams implementation of the Kafka >>>>>>>> connector. When it comes to consuming messages, the implementation >>>>>> relies >>>>>>>> heavily on the fact that we can pause and resume partitions. In some >>>>>>>> situations when a single consumer instance is shared among several >>>>>>>> streams, >>>>>>>> we might end up with frequently pausing and unpausing a set of topic >>>>>>>> partitions, which is the main facility that allows us to implement >> back >>>>>>>> pressure. This however has certain disadvantages, especially when >>>>>>>> there are >>>>>>>> two consumers that differ in terms of processing speed. >>>>>>>> >>>>>>>> To articulate the issue more clearly, imagine that a consumer >> maintains >>>>>>>> assignments for two topic partitions *TP1* and *TP2*. This consumer >> is >>>>>>>> shared by two streams - S1 and S2. So effectively when we have >> demand >>>>>>>> from >>>>>>>> only one of the streams - *S1*, we will pause one of the topic >>>>>> partitions >>>>>>>> *TP2* and call *poll()* on the consumer to only retrieve the records >>>>>> for >>>>>>>> the demanded topic partition - *TP1*. The result of that is all the >>>>>>>> records >>>>>>>> that have been prefetched for *TP2* are now thrown away by the >> fetcher >>>>>>>> ("*Not >>>>>>>> returning fetched records for assigned partition TP2 since it is no >>>>>>>> longer >>>>>>>> fetchable"*). If we extrapolate that to multiple streams sharing the >>>>>> same >>>>>>>> consumer, we might quickly end up in a situation where we throw >>>>>>>> prefetched >>>>>>>> data quite often. This does not seem like the most efficient >> approach >>>>>> and >>>>>>>> in fact produces quite a lot of overlapping fetch requests as >>>>>> illustrated >>>>>>>> in the following issue: >>>>>>>> >>>>>>>> https://github.com/akka/alpakka-kafka/issues/549 >>>>>>>> >>>>>>>> I am writing this email to get some initial opinion on a KIP I was >>>>>>>> thinking >>>>>>>> about. What if we give the clients of the Consumer API a bit more >>>>>> control >>>>>>>> of what to do with this prefetched data. Two options I am wondering >>>>>>>> about: >>>>>>>> >>>>>>>> 1. Introduce a configuration setting, such as* >>>>>>>> "return-prefetched-data-for-paused-topic-partitions = false"* (have >> to >>>>>>>> think of a better name), which when set to true will return what is >>>>>>>> prefetched instead of throwing it away on calling *poll()*. Since >> this >>>>>> is >>>>>>>> amount of data that is bounded by the maximum size of the prefetch, >> we >>>>>>>> can >>>>>>>> control what is the most amount of records returned. The client of >> the >>>>>>>> consumer API can then be responsible for keeping that data around >> and >>>>>> use >>>>>>>> it when appropriate (i.e. when demand is present) >>>>>>>> >>>>>>>> 2. Introduce a facility to pass in a buffer into which the >> prefetched >>>>>>>> records are drained when poll is called and paused partitions have >> some >>>>>>>> prefetched records. >>>>>>>> >>>>>>>> Any opinions on the matter are welcome. Thanks a lot ! >>>>>>>> >>>>>>>> Zahari Dichev >>>>>>>> >>>>>> >>>>> >>>> >> >> >
signature.asc
Description: OpenPGP digital signature