Thanks for your feedback Matthias, Do you think you can point me to the part where Kafka streams deals with all of that so I can take a look. Will try and see whether your suggested approach works for us before trying to argue my point further. Just a mall thing to mention though, blocking on the poll forever is not something that I am convinced we would like to do.
Zahari On Sun, Oct 21, 2018 at 10:11 PM Matthias J. Sax <matth...@confluent.io> wrote: > > You have one consumer that is quite > >> slow so lets say it call poll every 5 seconds, while you need to call > poll > >> every 1 second to issue a heartbeat (these are made up numbers of > course). > > Heartbeats are send by a backgroud thread since 0.10.0.0. And there is a > second configuration `max.poll.interval.ms` that you can increase for a > slow consumer. Kafka Streams sets it to `Integer.MAX_VALUE` for example. > > For the remainder, I agree, that using a buffer on top might not a > perfect solution. That is, what I meant by: > > > (not 100% percent guarantee, > > depending on fetch size and max.poll.record etc, but with high > > probability) > > However, I believe, that if you configure the consumer accordingly, you > can to drain the fetch buffer if you block on `poll()` forever. > > I have to admit, than I am not familiar with the details of pipelining > fetch requests thought. The general idea is, to make sure to drain the > internal buffer of the consumer, before you call `pause()`. > > Curious to to learn why this would not work? How does the pipelining of > fetch requests works in detail? > > > -Matthias > > > On 10/19/18 1:36 PM, Zahari Dichev wrote: > > Hi there Matthias, > > > > Very useful thoughts indeed. I have considered the exact same approach > but > > what worries me a bit is that I do not think that will certainly solve > the > > issue. Imagine the following situation. You have one consumer that is > quite > > slow so lets say it call poll every 5 seconds, while you need to call > poll > > every 1 second to issue a heartbeat (these are made up numbers of > course). > > > > So our consumer calls poll at T0 grabs some data and puts it in a buffer > > and calls pause on the topic partition. The fetcher tries to pipeline and > > issues a fetch request and at some point the data arrives. At that point > we > > have some data in the buffer and we can do whatever we want with it, but > > there is also some data living within the consumer/fetcher. Approximately > > one second later we call poll again because we need to. We are not > getting > > any data because our partition is paused and this is good because if we > got > > data we would not know what to do with it as our client is still busy > > crunching the data from the first poll. So far so good. What happens > though > > is that the pre-fetched data gets thrown away upon calling poll as its > "no > > longer fetchable...". Maybe I am not fully understanding your suggested > > approach, but I dont think it would solve this problem. > > > > Zahari > > > > On Fri, Oct 19, 2018 at 8:10 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Just my 2 cents. > >> > >> I am not 100% sure if we would need to change the consumer for this. > >> While I still think, that KIP-349 might be valuable, it seems to be > >> complementary/orthogonal to the issue discussed here. > >> > >> For Kafka Streams, we have a related scenario and what Kafka Streams > >> does is, to add its own buffer on top of the consumer. Thus, for each > >> `poll()` all data is put into this buffer, and now Streams can decide > >> which record to process first. For buffers that have data, we can call > >> `pause()` without loosing fetched data (not 100% percent guarantee, > >> depending on fetch size and max.poll.record etc, but with high > >> probability) and if a buffer gets empty we `resume()` partitions. > >> > >> As Akka Streams builds on top of the consumer it could implement a > >> similar pattern. Of course, one cannot use `auto.commit` on the > >> consumer, but commits need to be manged manually, (ie, only data that > >> was take out of the buffer and actually was processed can be committed). > >> > >> For the `MessageChooser` idea, I also still think it might be useful, > >> but it's unclear to me if this should be a consumer feature or build on > >> top of the consumer (maybe it could be a Streams feature, as Streams is > >> build on top of the consumer). Thoughts? > >> > >> > >> -Matthias > >> > >> On 10/18/18 9:27 AM, Jan Filipiak wrote: > >>> The idea for you would be that Messagechooser could hang on to the > >>> prefetched messages. > >>> > >>> ccing cmcc...@apache.org > >>> > >>> @Collin > >>> just for you to see that MessageChooser is a powerfull abstraction. > >>> > >>> :) > >>> > >>> Best jan > >>> > >>> On 18.10.2018 13:59, Zahari Dichev wrote: > >>>> Jan, > >>>> > >>>> Quite insightful indeed. I think your propositions are valid. > >>>> > >>>> Ryanne, > >>>> > >>>> I understand that consumers are using a pull model... And yes, indeed > >> if a > >>>> consumer is not ready for more records it surely should not call poll. > >>>> Except that it needs to do so periodically in order to indicate that > its > >>>> live. Forget about the "backpressure", I guess I was wrong with > phrasing > >>>> this so lets not get caught up on it. > >>>> > >>>> You say pause/resume can be used to prioritise certain > topics/partitions > >>>> over others. And indeed this is the case. So instead of thinking about > >> it > >>>> in terms of backpressure, lets put it in a different way. The Akka > >> streams > >>>> connector would like to prioritise certain topics over others, using > >> once > >>>> consumer instance. On top of that, add the detail that the priorities > >>>> change quite frequently (which translates to calling pause/resume > >>>> frequently). So all that being said, what would be a proper way to > >> handle > >>>> the situation without throwing the pre-fetched records away when > calling > >>>> poll on a consumer that happens to have a topic that was recently > paused > >>>> (and that might be un-paused soon )? Am I the only one who considers > >> that > >>>> an actual problem with the use os pause/resume ? Not sure how to > explain > >>>> the situation in a better way.. > >>>> > >>>> Zahari > >>>> > >>>> > >>>> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com > > > >>>> wrote: > >>>> > >>>>> Thanks a lot Jan, > >>>>> > >>>>> I will read it. > >>>>> > >>>>> Zahari > >>>>> > >>>>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak < > jan.filip...@trivago.com > >>> > >>>>> wrote: > >>>>> > >>>>>> especially my suggestions ;) > >>>>>> > >>>>>> On 18.10.2018 08:30, Jan Filipiak wrote: > >>>>>>> Hi Zahari, > >>>>>>> > >>>>>>> would you be willing to scan through the KIP-349 discussion a > little? > >>>>>>> I think it has suggestions that could be interesting for you > >>>>>>> > >>>>>>> Best Jan > >>>>>>> > >>>>>>> On 16.10.2018 09:29, Zahari Dichev wrote: > >>>>>>>> Hi there Kafka developers, > >>>>>>>> > >>>>>>>> I am currently trying to find a solution to an issue that has been > >>>>>>>> manifesting itself in the Akka streams implementation of the Kafka > >>>>>>>> connector. When it comes to consuming messages, the implementation > >>>>>> relies > >>>>>>>> heavily on the fact that we can pause and resume partitions. In > some > >>>>>>>> situations when a single consumer instance is shared among several > >>>>>>>> streams, > >>>>>>>> we might end up with frequently pausing and unpausing a set of > topic > >>>>>>>> partitions, which is the main facility that allows us to implement > >> back > >>>>>>>> pressure. This however has certain disadvantages, especially when > >>>>>>>> there are > >>>>>>>> two consumers that differ in terms of processing speed. > >>>>>>>> > >>>>>>>> To articulate the issue more clearly, imagine that a consumer > >> maintains > >>>>>>>> assignments for two topic partitions *TP1* and *TP2*. This > consumer > >> is > >>>>>>>> shared by two streams - S1 and S2. So effectively when we have > >> demand > >>>>>>>> from > >>>>>>>> only one of the streams - *S1*, we will pause one of the topic > >>>>>> partitions > >>>>>>>> *TP2* and call *poll()* on the consumer to only retrieve the > records > >>>>>> for > >>>>>>>> the demanded topic partition - *TP1*. The result of that is all > the > >>>>>>>> records > >>>>>>>> that have been prefetched for *TP2* are now thrown away by the > >> fetcher > >>>>>>>> ("*Not > >>>>>>>> returning fetched records for assigned partition TP2 since it is > no > >>>>>>>> longer > >>>>>>>> fetchable"*). If we extrapolate that to multiple streams sharing > the > >>>>>> same > >>>>>>>> consumer, we might quickly end up in a situation where we throw > >>>>>>>> prefetched > >>>>>>>> data quite often. This does not seem like the most efficient > >> approach > >>>>>> and > >>>>>>>> in fact produces quite a lot of overlapping fetch requests as > >>>>>> illustrated > >>>>>>>> in the following issue: > >>>>>>>> > >>>>>>>> https://github.com/akka/alpakka-kafka/issues/549 > >>>>>>>> > >>>>>>>> I am writing this email to get some initial opinion on a KIP I was > >>>>>>>> thinking > >>>>>>>> about. What if we give the clients of the Consumer API a bit more > >>>>>> control > >>>>>>>> of what to do with this prefetched data. Two options I am > wondering > >>>>>>>> about: > >>>>>>>> > >>>>>>>> 1. Introduce a configuration setting, such as* > >>>>>>>> "return-prefetched-data-for-paused-topic-partitions = false"* > (have > >> to > >>>>>>>> think of a better name), which when set to true will return what > is > >>>>>>>> prefetched instead of throwing it away on calling *poll()*. Since > >> this > >>>>>> is > >>>>>>>> amount of data that is bounded by the maximum size of the > prefetch, > >> we > >>>>>>>> can > >>>>>>>> control what is the most amount of records returned. The client of > >> the > >>>>>>>> consumer API can then be responsible for keeping that data around > >> and > >>>>>> use > >>>>>>>> it when appropriate (i.e. when demand is present) > >>>>>>>> > >>>>>>>> 2. Introduce a facility to pass in a buffer into which the > >> prefetched > >>>>>>>> records are drained when poll is called and paused partitions have > >> some > >>>>>>>> prefetched records. > >>>>>>>> > >>>>>>>> Any opinions on the matter are welcome. Thanks a lot ! > >>>>>>>> > >>>>>>>> Zahari Dichev > >>>>>>>> > >>>>>> > >>>>> > >>>> > >> > >> > > > >