It's spread out multiple classes... Good starting point is here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L806
It implements the main-loop that polls, addRecordsToTasks() (ie, put the into buffers), and processes record. pause/resume is done here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L720 and https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L362 Not blocking forever makes sense of course. I did not mean in literally. However, longer poll() blocking times, should allow you to drain the consumer buffer better. -Matthias On 10/21/18 12:54 PM, Zahari Dichev wrote: > Thanks for your feedback Matthias, Do you think you can point me to the > part where Kafka streams deals with all of that so I can take a look. Will > try and see whether your suggested approach works for us before trying to > argue my point further. Just a mall thing to mention though, blocking on > the poll forever is not something that I am convinced we would like to do. > > Zahari > > On Sun, Oct 21, 2018 at 10:11 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >>> You have one consumer that is quite >>>> slow so lets say it call poll every 5 seconds, while you need to call >> poll >>>> every 1 second to issue a heartbeat (these are made up numbers of >> course). >> >> Heartbeats are send by a backgroud thread since 0.10.0.0. And there is a >> second configuration `max.poll.interval.ms` that you can increase for a >> slow consumer. Kafka Streams sets it to `Integer.MAX_VALUE` for example. >> >> For the remainder, I agree, that using a buffer on top might not a >> perfect solution. That is, what I meant by: >> >>> (not 100% percent guarantee, >>> depending on fetch size and max.poll.record etc, but with high >>> probability) >> >> However, I believe, that if you configure the consumer accordingly, you >> can to drain the fetch buffer if you block on `poll()` forever. >> >> I have to admit, than I am not familiar with the details of pipelining >> fetch requests thought. The general idea is, to make sure to drain the >> internal buffer of the consumer, before you call `pause()`. >> >> Curious to to learn why this would not work? How does the pipelining of >> fetch requests works in detail? >> >> >> -Matthias >> >> >> On 10/19/18 1:36 PM, Zahari Dichev wrote: >>> Hi there Matthias, >>> >>> Very useful thoughts indeed. I have considered the exact same approach >> but >>> what worries me a bit is that I do not think that will certainly solve >> the >>> issue. Imagine the following situation. You have one consumer that is >> quite >>> slow so lets say it call poll every 5 seconds, while you need to call >> poll >>> every 1 second to issue a heartbeat (these are made up numbers of >> course). >>> >>> So our consumer calls poll at T0 grabs some data and puts it in a buffer >>> and calls pause on the topic partition. The fetcher tries to pipeline and >>> issues a fetch request and at some point the data arrives. At that point >> we >>> have some data in the buffer and we can do whatever we want with it, but >>> there is also some data living within the consumer/fetcher. Approximately >>> one second later we call poll again because we need to. We are not >> getting >>> any data because our partition is paused and this is good because if we >> got >>> data we would not know what to do with it as our client is still busy >>> crunching the data from the first poll. So far so good. What happens >> though >>> is that the pre-fetched data gets thrown away upon calling poll as its >> "no >>> longer fetchable...". Maybe I am not fully understanding your suggested >>> approach, but I dont think it would solve this problem. >>> >>> Zahari >>> >>> On Fri, Oct 19, 2018 at 8:10 PM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> Just my 2 cents. >>>> >>>> I am not 100% sure if we would need to change the consumer for this. >>>> While I still think, that KIP-349 might be valuable, it seems to be >>>> complementary/orthogonal to the issue discussed here. >>>> >>>> For Kafka Streams, we have a related scenario and what Kafka Streams >>>> does is, to add its own buffer on top of the consumer. Thus, for each >>>> `poll()` all data is put into this buffer, and now Streams can decide >>>> which record to process first. For buffers that have data, we can call >>>> `pause()` without loosing fetched data (not 100% percent guarantee, >>>> depending on fetch size and max.poll.record etc, but with high >>>> probability) and if a buffer gets empty we `resume()` partitions. >>>> >>>> As Akka Streams builds on top of the consumer it could implement a >>>> similar pattern. Of course, one cannot use `auto.commit` on the >>>> consumer, but commits need to be manged manually, (ie, only data that >>>> was take out of the buffer and actually was processed can be committed). >>>> >>>> For the `MessageChooser` idea, I also still think it might be useful, >>>> but it's unclear to me if this should be a consumer feature or build on >>>> top of the consumer (maybe it could be a Streams feature, as Streams is >>>> build on top of the consumer). Thoughts? >>>> >>>> >>>> -Matthias >>>> >>>> On 10/18/18 9:27 AM, Jan Filipiak wrote: >>>>> The idea for you would be that Messagechooser could hang on to the >>>>> prefetched messages. >>>>> >>>>> ccing cmcc...@apache.org >>>>> >>>>> @Collin >>>>> just for you to see that MessageChooser is a powerfull abstraction. >>>>> >>>>> :) >>>>> >>>>> Best jan >>>>> >>>>> On 18.10.2018 13:59, Zahari Dichev wrote: >>>>>> Jan, >>>>>> >>>>>> Quite insightful indeed. I think your propositions are valid. >>>>>> >>>>>> Ryanne, >>>>>> >>>>>> I understand that consumers are using a pull model... And yes, indeed >>>> if a >>>>>> consumer is not ready for more records it surely should not call poll. >>>>>> Except that it needs to do so periodically in order to indicate that >> its >>>>>> live. Forget about the "backpressure", I guess I was wrong with >> phrasing >>>>>> this so lets not get caught up on it. >>>>>> >>>>>> You say pause/resume can be used to prioritise certain >> topics/partitions >>>>>> over others. And indeed this is the case. So instead of thinking about >>>> it >>>>>> in terms of backpressure, lets put it in a different way. The Akka >>>> streams >>>>>> connector would like to prioritise certain topics over others, using >>>> once >>>>>> consumer instance. On top of that, add the detail that the priorities >>>>>> change quite frequently (which translates to calling pause/resume >>>>>> frequently). So all that being said, what would be a proper way to >>>> handle >>>>>> the situation without throwing the pre-fetched records away when >> calling >>>>>> poll on a consumer that happens to have a topic that was recently >> paused >>>>>> (and that might be un-paused soon )? Am I the only one who considers >>>> that >>>>>> an actual problem with the use os pause/resume ? Not sure how to >> explain >>>>>> the situation in a better way.. >>>>>> >>>>>> Zahari >>>>>> >>>>>> >>>>>> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com >>> >>>>>> wrote: >>>>>> >>>>>>> Thanks a lot Jan, >>>>>>> >>>>>>> I will read it. >>>>>>> >>>>>>> Zahari >>>>>>> >>>>>>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak < >> jan.filip...@trivago.com >>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> especially my suggestions ;) >>>>>>>> >>>>>>>> On 18.10.2018 08:30, Jan Filipiak wrote: >>>>>>>>> Hi Zahari, >>>>>>>>> >>>>>>>>> would you be willing to scan through the KIP-349 discussion a >> little? >>>>>>>>> I think it has suggestions that could be interesting for you >>>>>>>>> >>>>>>>>> Best Jan >>>>>>>>> >>>>>>>>> On 16.10.2018 09:29, Zahari Dichev wrote: >>>>>>>>>> Hi there Kafka developers, >>>>>>>>>> >>>>>>>>>> I am currently trying to find a solution to an issue that has been >>>>>>>>>> manifesting itself in the Akka streams implementation of the Kafka >>>>>>>>>> connector. When it comes to consuming messages, the implementation >>>>>>>> relies >>>>>>>>>> heavily on the fact that we can pause and resume partitions. In >> some >>>>>>>>>> situations when a single consumer instance is shared among several >>>>>>>>>> streams, >>>>>>>>>> we might end up with frequently pausing and unpausing a set of >> topic >>>>>>>>>> partitions, which is the main facility that allows us to implement >>>> back >>>>>>>>>> pressure. This however has certain disadvantages, especially when >>>>>>>>>> there are >>>>>>>>>> two consumers that differ in terms of processing speed. >>>>>>>>>> >>>>>>>>>> To articulate the issue more clearly, imagine that a consumer >>>> maintains >>>>>>>>>> assignments for two topic partitions *TP1* and *TP2*. This >> consumer >>>> is >>>>>>>>>> shared by two streams - S1 and S2. So effectively when we have >>>> demand >>>>>>>>>> from >>>>>>>>>> only one of the streams - *S1*, we will pause one of the topic >>>>>>>> partitions >>>>>>>>>> *TP2* and call *poll()* on the consumer to only retrieve the >> records >>>>>>>> for >>>>>>>>>> the demanded topic partition - *TP1*. The result of that is all >> the >>>>>>>>>> records >>>>>>>>>> that have been prefetched for *TP2* are now thrown away by the >>>> fetcher >>>>>>>>>> ("*Not >>>>>>>>>> returning fetched records for assigned partition TP2 since it is >> no >>>>>>>>>> longer >>>>>>>>>> fetchable"*). If we extrapolate that to multiple streams sharing >> the >>>>>>>> same >>>>>>>>>> consumer, we might quickly end up in a situation where we throw >>>>>>>>>> prefetched >>>>>>>>>> data quite often. This does not seem like the most efficient >>>> approach >>>>>>>> and >>>>>>>>>> in fact produces quite a lot of overlapping fetch requests as >>>>>>>> illustrated >>>>>>>>>> in the following issue: >>>>>>>>>> >>>>>>>>>> https://github.com/akka/alpakka-kafka/issues/549 >>>>>>>>>> >>>>>>>>>> I am writing this email to get some initial opinion on a KIP I was >>>>>>>>>> thinking >>>>>>>>>> about. What if we give the clients of the Consumer API a bit more >>>>>>>> control >>>>>>>>>> of what to do with this prefetched data. Two options I am >> wondering >>>>>>>>>> about: >>>>>>>>>> >>>>>>>>>> 1. Introduce a configuration setting, such as* >>>>>>>>>> "return-prefetched-data-for-paused-topic-partitions = false"* >> (have >>>> to >>>>>>>>>> think of a better name), which when set to true will return what >> is >>>>>>>>>> prefetched instead of throwing it away on calling *poll()*. Since >>>> this >>>>>>>> is >>>>>>>>>> amount of data that is bounded by the maximum size of the >> prefetch, >>>> we >>>>>>>>>> can >>>>>>>>>> control what is the most amount of records returned. The client of >>>> the >>>>>>>>>> consumer API can then be responsible for keeping that data around >>>> and >>>>>>>> use >>>>>>>>>> it when appropriate (i.e. when demand is present) >>>>>>>>>> >>>>>>>>>> 2. Introduce a facility to pass in a buffer into which the >>>> prefetched >>>>>>>>>> records are drained when poll is called and paused partitions have >>>> some >>>>>>>>>> prefetched records. >>>>>>>>>> >>>>>>>>>> Any opinions on the matter are welcome. Thanks a lot ! >>>>>>>>>> >>>>>>>>>> Zahari Dichev >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature