Ryanne Dolan <ryannedo...@gmail.com> wrote:
> It sounds to me like this problem is due to Akka attempting to implement
> additional backpressure on top of the Consumer API. I'd suggest they not do
> that, and then this problem goes away.

Imagine a very simple case where you want to consume from three partitions at 
about the same rate, but the messages in those partitions have different 
average sizes.  You can't keep the consumption rate the same without 
occassionally pausing the fast partitions.

I think we should encourage people to use pause and resume when it's 
appropriate.  After all, Kafka Streams uses pause and resume, for much the same 
reasons.  It's demonstrably a simple and useful API.  There's nothing wrong 
with frameworks implementing backpressure.

Jan Filipiak <jan.filip...@trivago.com> wrote:
 > The idea for you would be that Messagechooser could hang on to the 
 > prefetched messages.
 > 
 > ccing cmcc...@apache.org
 > 
 > @Collin
 > just for you to see that MessageChooser is a powerfull abstraction.

Samza has a lot of powerful abstractions.  However, in this case, you can get 
everything you need just by using pause and resume on partitions.  And then 
Zahari can implement some powerful abstractions of his own, in Akka Streams. :)

Matthias J. Sax <matth...@confluent.io> wrote:
> Heartbeats are send by a backgroud thread since 0.10.0.0. And there is a
> second configuration `max.poll.interval.ms` that you can increase for a
> slow consumer. Kafka Streams sets it to `Integer.MAX_VALUE` for example.

Right, that's a very important point.  Heartbeats are sent by a background 
thread now.

Zahari Dichev <zaharidic...@gmail.com> wrote:
> Just a mall thing to mention though, blocking on
> the poll forever is not something that I am convinced we would like to do.

There is a KafkaConsumer#wakeup method which might help here.
>     * Wakeup the consumer. This method is thread-safe and is useful in 
> particular to abort a long poll.

best,
Colin


On Sun, Oct 21, 2018, at 14:59, Matthias J. Sax wrote:
> It's spread out multiple classes...
> 
> Good starting point is here:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L806
> 
> It implements the main-loop that polls, addRecordsToTasks() (ie, put the
> into buffers), and processes record.
> 
> pause/resume is done here:
> 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L720
> 
> and
> 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L362
> 
> 
> Not blocking forever makes sense of course. I did not mean in literally.
> However, longer poll() blocking times, should allow you to drain the
> consumer buffer better.
> 
> 
> -Matthias
> 
> On 10/21/18 12:54 PM, Zahari Dichev wrote:
> > Thanks for your feedback Matthias, Do you think you can point me to the
> > part where Kafka streams deals with all of that so I can take a look. Will
> > try and see whether your suggested approach works for us before trying to
> > argue my point further. Just a mall thing to mention though, blocking on
> > the poll forever is not something that I am convinced we would like to do.
> > 
> > Zahari
> > 
> > On Sun, Oct 21, 2018 at 10:11 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> > 
> >>> You have one consumer that is quite
> >>>> slow so lets say it call poll every 5 seconds, while you need to call
> >> poll
> >>>> every 1 second to issue a heartbeat (these are made up numbers of
> >> course).
> >>
> >> Heartbeats are send by a backgroud thread since 0.10.0.0. And there is a
> >> second configuration `max.poll.interval.ms` that you can increase for a
> >> slow consumer. Kafka Streams sets it to `Integer.MAX_VALUE` for example.
> >>
> >> For the remainder, I agree, that using a buffer on top might not a
> >> perfect solution. That is, what I meant by:
> >>
> >>> (not 100% percent guarantee,
> >>> depending on fetch size and max.poll.record etc, but with high
> >>> probability)
> >>
> >> However, I believe, that if you configure the consumer accordingly, you
> >> can to drain the fetch buffer if you block on `poll()` forever.
> >>
> >> I have to admit, than I am not familiar with the details of pipelining
> >> fetch requests thought. The general idea is, to make sure to drain the
> >> internal buffer of the consumer, before you call `pause()`.
> >>
> >> Curious to to learn why this would not work? How does the pipelining of
> >> fetch requests works in detail?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 10/19/18 1:36 PM, Zahari Dichev wrote:
> >>> Hi there Matthias,
> >>>
> >>> Very useful thoughts indeed. I have considered the exact same approach
> >> but
> >>> what worries me a bit is that I do not think that will certainly solve
> >> the
> >>> issue. Imagine the following situation. You have one consumer that is
> >> quite
> >>> slow so lets say it call poll every 5 seconds, while you need to call
> >> poll
> >>> every 1 second to issue a heartbeat (these are made up numbers of
> >> course).
> >>>
> >>> So our consumer calls poll at T0 grabs some data and puts it in a buffer
> >>> and calls pause on the topic partition. The fetcher tries to pipeline and
> >>> issues a fetch request and at some point the data arrives. At that point
> >> we
> >>> have some data in the buffer and we can do whatever we want with it, but
> >>> there is also some data living within the consumer/fetcher. Approximately
> >>> one second later we call poll again because we need to. We are not
> >> getting
> >>> any data because our partition is paused and this is good because if we
> >> got
> >>> data we would not know what to do with it as our client is still busy
> >>> crunching the data from the first poll. So far so good. What happens
> >> though
> >>> is that the pre-fetched data gets thrown away upon calling poll as its
> >> "no
> >>> longer fetchable...".  Maybe I am not fully understanding your suggested
> >>> approach, but I dont think it would solve this problem.
> >>>
> >>> Zahari
> >>>
> >>> On Fri, Oct 19, 2018 at 8:10 PM Matthias J. Sax <matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> Just my 2 cents.
> >>>>
> >>>> I am not 100% sure if we would need to change the consumer for this.
> >>>> While I still think, that KIP-349 might be valuable, it seems to be
> >>>> complementary/orthogonal to the issue discussed here.
> >>>>
> >>>> For Kafka Streams, we have a related scenario and what Kafka Streams
> >>>> does is, to add its own buffer on top of the consumer. Thus, for each
> >>>> `poll()` all data is put into this buffer, and now Streams can decide
> >>>> which record to process first. For buffers that have data, we can call
> >>>> `pause()` without loosing fetched data (not 100% percent guarantee,
> >>>> depending on fetch size and max.poll.record etc, but with high
> >>>> probability) and if a buffer gets empty we `resume()` partitions.
> >>>>
> >>>> As Akka Streams builds on top of the consumer it could implement a
> >>>> similar pattern. Of course, one cannot use `auto.commit` on the
> >>>> consumer, but commits need to be manged manually, (ie, only data that
> >>>> was take out of the buffer and actually was processed can be committed).
> >>>>
> >>>> For the `MessageChooser` idea, I also still think it might be useful,
> >>>> but it's unclear to me if this should be a consumer feature or build on
> >>>> top of the consumer (maybe it could be a Streams feature, as Streams is
> >>>> build on top of the consumer). Thoughts?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 10/18/18 9:27 AM, Jan Filipiak wrote:
> >>>>> The idea for you would be that Messagechooser could hang on to the
> >>>>> prefetched messages.
> >>>>>
> >>>>> ccing cmcc...@apache.org
> >>>>>
> >>>>> @Collin
> >>>>> just for you to see that MessageChooser is a powerfull abstraction.
> >>>>>
> >>>>> :)
> >>>>>
> >>>>> Best jan
> >>>>>
> >>>>> On 18.10.2018 13:59, Zahari Dichev wrote:
> >>>>>> Jan,
> >>>>>>
> >>>>>> Quite insightful indeed. I think your propositions are valid.
> >>>>>>
> >>>>>> Ryanne,
> >>>>>>
> >>>>>> I understand that consumers are using a pull model... And yes, indeed
> >>>> if a
> >>>>>> consumer is not ready for more records it surely should not call poll.
> >>>>>> Except that it needs to do so periodically in order to indicate that
> >> its
> >>>>>> live. Forget about the "backpressure", I guess I was wrong with
> >> phrasing
> >>>>>> this so lets not get caught up on it.
> >>>>>>
> >>>>>> You say pause/resume can be used to prioritise certain
> >> topics/partitions
> >>>>>> over others. And indeed this is the case. So instead of thinking about
> >>>> it
> >>>>>> in terms of backpressure, lets put it in a different way. The Akka
> >>>> streams
> >>>>>> connector would like to prioritise certain topics over others, using
> >>>> once
> >>>>>> consumer instance. On top of that, add the detail that the priorities
> >>>>>> change quite frequently (which translates to calling pause/resume
> >>>>>> frequently). So all that being said, what would be a proper way to
> >>>> handle
> >>>>>> the situation without throwing the pre-fetched records away when
> >> calling
> >>>>>> poll on a consumer that happens to have a topic that was recently
> >> paused
> >>>>>> (and that might be un-paused soon )? Am I the only one who considers
> >>>> that
> >>>>>> an actual problem with the use os pause/resume ? Not sure how to
> >> explain
> >>>>>> the situation in a better way..
> >>>>>>
> >>>>>> Zahari
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com
> >>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks a lot Jan,
> >>>>>>>
> >>>>>>> I will read it.
> >>>>>>>
> >>>>>>> Zahari
> >>>>>>>
> >>>>>>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <
> >> jan.filip...@trivago.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> especially my suggestions ;)
> >>>>>>>>
> >>>>>>>> On 18.10.2018 08:30, Jan Filipiak wrote:
> >>>>>>>>> Hi Zahari,
> >>>>>>>>>
> >>>>>>>>> would you be willing to scan through the KIP-349 discussion a
> >> little?
> >>>>>>>>> I think it has suggestions that could be interesting for you
> >>>>>>>>>
> >>>>>>>>> Best Jan
> >>>>>>>>>
> >>>>>>>>> On 16.10.2018 09:29, Zahari Dichev wrote:
> >>>>>>>>>> Hi there Kafka developers,
> >>>>>>>>>>
> >>>>>>>>>> I am currently trying to find a solution to an issue that has been
> >>>>>>>>>> manifesting itself in the Akka streams implementation of the Kafka
> >>>>>>>>>> connector. When it comes to consuming messages, the implementation
> >>>>>>>> relies
> >>>>>>>>>> heavily on the fact that we can pause and resume partitions. In
> >> some
> >>>>>>>>>> situations when a single consumer instance is shared among several
> >>>>>>>>>> streams,
> >>>>>>>>>> we might end up with frequently pausing and unpausing a set of
> >> topic
> >>>>>>>>>> partitions, which is the main facility that allows us to implement
> >>>> back
> >>>>>>>>>> pressure. This however has certain disadvantages, especially when
> >>>>>>>>>> there are
> >>>>>>>>>> two consumers that differ in terms of processing speed.
> >>>>>>>>>>
> >>>>>>>>>> To articulate the issue more clearly, imagine that a consumer
> >>>> maintains
> >>>>>>>>>> assignments for two topic partitions *TP1* and *TP2*. This
> >> consumer
> >>>> is
> >>>>>>>>>> shared by two streams - S1 and S2. So effectively when we have
> >>>> demand
> >>>>>>>>>> from
> >>>>>>>>>> only one of the streams - *S1*, we will pause one of the topic
> >>>>>>>> partitions
> >>>>>>>>>> *TP2* and call *poll()* on the consumer to only retrieve the
> >> records
> >>>>>>>> for
> >>>>>>>>>> the demanded topic partition - *TP1*. The result of that is all
> >> the
> >>>>>>>>>> records
> >>>>>>>>>> that have been prefetched for *TP2* are now thrown away by the
> >>>> fetcher
> >>>>>>>>>> ("*Not
> >>>>>>>>>> returning fetched records for assigned partition TP2 since it is
> >> no
> >>>>>>>>>> longer
> >>>>>>>>>> fetchable"*). If we extrapolate that to multiple streams sharing
> >> the
> >>>>>>>> same
> >>>>>>>>>> consumer, we might quickly end up in a situation where we throw
> >>>>>>>>>> prefetched
> >>>>>>>>>> data quite often. This does not seem like the most efficient
> >>>> approach
> >>>>>>>> and
> >>>>>>>>>> in fact produces quite a lot of overlapping fetch requests as
> >>>>>>>> illustrated
> >>>>>>>>>> in the following issue:
> >>>>>>>>>>
> >>>>>>>>>> https://github.com/akka/alpakka-kafka/issues/549
> >>>>>>>>>>
> >>>>>>>>>> I am writing this email to get some initial opinion on a KIP I was
> >>>>>>>>>> thinking
> >>>>>>>>>> about. What if we give the clients of the Consumer API a bit more
> >>>>>>>> control
> >>>>>>>>>> of what to do with this prefetched data. Two options I am
> >> wondering
> >>>>>>>>>> about:
> >>>>>>>>>>
> >>>>>>>>>> 1. Introduce a configuration setting, such as*
> >>>>>>>>>> "return-prefetched-data-for-paused-topic-partitions = false"*
> >> (have
> >>>> to
> >>>>>>>>>> think of a better name), which when set to true will return what
> >> is
> >>>>>>>>>> prefetched instead of throwing it away on calling *poll()*. Since
> >>>> this
> >>>>>>>> is
> >>>>>>>>>> amount of data that is bounded by the maximum size of the
> >> prefetch,
> >>>> we
> >>>>>>>>>> can
> >>>>>>>>>> control what is the most amount of records returned. The client of
> >>>> the
> >>>>>>>>>> consumer API can then be responsible for keeping that data around
> >>>> and
> >>>>>>>> use
> >>>>>>>>>> it when appropriate (i.e. when demand is present)
> >>>>>>>>>>
> >>>>>>>>>> 2. Introduce a facility to pass in a buffer into which the
> >>>> prefetched
> >>>>>>>>>> records are drained when poll is called and paused partitions have
> >>>> some
> >>>>>>>>>> prefetched records.
> >>>>>>>>>>
> >>>>>>>>>> Any opinions on the matter are welcome. Thanks a lot !
> >>>>>>>>>>
> >>>>>>>>>> Zahari Dichev
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> > 
> 
> Email had 1 attachment:
> + signature.asc
>   1k (application/pgp-signature)

Reply via email to