Thanks for your feedback Matthias, Do you think you can point me to the
part where Kafka streams deals with all of that so I can take a look. Will
try and see whether your suggested approach works for us before trying to
argue my point further. Just a mall thing to mention though, blocking on
the poll forever is not something that I am convinced we would like to do.

Zahari

On Sun, Oct 21, 2018 at 10:11 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> > You have one consumer that is quite
> >> slow so lets say it call poll every 5 seconds, while you need to call
> poll
> >> every 1 second to issue a heartbeat (these are made up numbers of
> course).
>
> Heartbeats are send by a backgroud thread since 0.10.0.0. And there is a
> second configuration `max.poll.interval.ms` that you can increase for a
> slow consumer. Kafka Streams sets it to `Integer.MAX_VALUE` for example.
>
> For the remainder, I agree, that using a buffer on top might not a
> perfect solution. That is, what I meant by:
>
> > (not 100% percent guarantee,
> > depending on fetch size and max.poll.record etc, but with high
> > probability)
>
> However, I believe, that if you configure the consumer accordingly, you
> can to drain the fetch buffer if you block on `poll()` forever.
>
> I have to admit, than I am not familiar with the details of pipelining
> fetch requests thought. The general idea is, to make sure to drain the
> internal buffer of the consumer, before you call `pause()`.
>
> Curious to to learn why this would not work? How does the pipelining of
> fetch requests works in detail?
>
>
> -Matthias
>
>
> On 10/19/18 1:36 PM, Zahari Dichev wrote:
> > Hi there Matthias,
> >
> > Very useful thoughts indeed. I have considered the exact same approach
> but
> > what worries me a bit is that I do not think that will certainly solve
> the
> > issue. Imagine the following situation. You have one consumer that is
> quite
> > slow so lets say it call poll every 5 seconds, while you need to call
> poll
> > every 1 second to issue a heartbeat (these are made up numbers of
> course).
> >
> > So our consumer calls poll at T0 grabs some data and puts it in a buffer
> > and calls pause on the topic partition. The fetcher tries to pipeline and
> > issues a fetch request and at some point the data arrives. At that point
> we
> > have some data in the buffer and we can do whatever we want with it, but
> > there is also some data living within the consumer/fetcher. Approximately
> > one second later we call poll again because we need to. We are not
> getting
> > any data because our partition is paused and this is good because if we
> got
> > data we would not know what to do with it as our client is still busy
> > crunching the data from the first poll. So far so good. What happens
> though
> > is that the pre-fetched data gets thrown away upon calling poll as its
> "no
> > longer fetchable...".  Maybe I am not fully understanding your suggested
> > approach, but I dont think it would solve this problem.
> >
> > Zahari
> >
> > On Fri, Oct 19, 2018 at 8:10 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Just my 2 cents.
> >>
> >> I am not 100% sure if we would need to change the consumer for this.
> >> While I still think, that KIP-349 might be valuable, it seems to be
> >> complementary/orthogonal to the issue discussed here.
> >>
> >> For Kafka Streams, we have a related scenario and what Kafka Streams
> >> does is, to add its own buffer on top of the consumer. Thus, for each
> >> `poll()` all data is put into this buffer, and now Streams can decide
> >> which record to process first. For buffers that have data, we can call
> >> `pause()` without loosing fetched data (not 100% percent guarantee,
> >> depending on fetch size and max.poll.record etc, but with high
> >> probability) and if a buffer gets empty we `resume()` partitions.
> >>
> >> As Akka Streams builds on top of the consumer it could implement a
> >> similar pattern. Of course, one cannot use `auto.commit` on the
> >> consumer, but commits need to be manged manually, (ie, only data that
> >> was take out of the buffer and actually was processed can be committed).
> >>
> >> For the `MessageChooser` idea, I also still think it might be useful,
> >> but it's unclear to me if this should be a consumer feature or build on
> >> top of the consumer (maybe it could be a Streams feature, as Streams is
> >> build on top of the consumer). Thoughts?
> >>
> >>
> >> -Matthias
> >>
> >> On 10/18/18 9:27 AM, Jan Filipiak wrote:
> >>> The idea for you would be that Messagechooser could hang on to the
> >>> prefetched messages.
> >>>
> >>> ccing cmcc...@apache.org
> >>>
> >>> @Collin
> >>> just for you to see that MessageChooser is a powerfull abstraction.
> >>>
> >>> :)
> >>>
> >>> Best jan
> >>>
> >>> On 18.10.2018 13:59, Zahari Dichev wrote:
> >>>> Jan,
> >>>>
> >>>> Quite insightful indeed. I think your propositions are valid.
> >>>>
> >>>> Ryanne,
> >>>>
> >>>> I understand that consumers are using a pull model... And yes, indeed
> >> if a
> >>>> consumer is not ready for more records it surely should not call poll.
> >>>> Except that it needs to do so periodically in order to indicate that
> its
> >>>> live. Forget about the "backpressure", I guess I was wrong with
> phrasing
> >>>> this so lets not get caught up on it.
> >>>>
> >>>> You say pause/resume can be used to prioritise certain
> topics/partitions
> >>>> over others. And indeed this is the case. So instead of thinking about
> >> it
> >>>> in terms of backpressure, lets put it in a different way. The Akka
> >> streams
> >>>> connector would like to prioritise certain topics over others, using
> >> once
> >>>> consumer instance. On top of that, add the detail that the priorities
> >>>> change quite frequently (which translates to calling pause/resume
> >>>> frequently). So all that being said, what would be a proper way to
> >> handle
> >>>> the situation without throwing the pre-fetched records away when
> calling
> >>>> poll on a consumer that happens to have a topic that was recently
> paused
> >>>> (and that might be un-paused soon )? Am I the only one who considers
> >> that
> >>>> an actual problem with the use os pause/resume ? Not sure how to
> explain
> >>>> the situation in a better way..
> >>>>
> >>>> Zahari
> >>>>
> >>>>
> >>>> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> Thanks a lot Jan,
> >>>>>
> >>>>> I will read it.
> >>>>>
> >>>>> Zahari
> >>>>>
> >>>>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <
> jan.filip...@trivago.com
> >>>
> >>>>> wrote:
> >>>>>
> >>>>>> especially my suggestions ;)
> >>>>>>
> >>>>>> On 18.10.2018 08:30, Jan Filipiak wrote:
> >>>>>>> Hi Zahari,
> >>>>>>>
> >>>>>>> would you be willing to scan through the KIP-349 discussion a
> little?
> >>>>>>> I think it has suggestions that could be interesting for you
> >>>>>>>
> >>>>>>> Best Jan
> >>>>>>>
> >>>>>>> On 16.10.2018 09:29, Zahari Dichev wrote:
> >>>>>>>> Hi there Kafka developers,
> >>>>>>>>
> >>>>>>>> I am currently trying to find a solution to an issue that has been
> >>>>>>>> manifesting itself in the Akka streams implementation of the Kafka
> >>>>>>>> connector. When it comes to consuming messages, the implementation
> >>>>>> relies
> >>>>>>>> heavily on the fact that we can pause and resume partitions. In
> some
> >>>>>>>> situations when a single consumer instance is shared among several
> >>>>>>>> streams,
> >>>>>>>> we might end up with frequently pausing and unpausing a set of
> topic
> >>>>>>>> partitions, which is the main facility that allows us to implement
> >> back
> >>>>>>>> pressure. This however has certain disadvantages, especially when
> >>>>>>>> there are
> >>>>>>>> two consumers that differ in terms of processing speed.
> >>>>>>>>
> >>>>>>>> To articulate the issue more clearly, imagine that a consumer
> >> maintains
> >>>>>>>> assignments for two topic partitions *TP1* and *TP2*. This
> consumer
> >> is
> >>>>>>>> shared by two streams - S1 and S2. So effectively when we have
> >> demand
> >>>>>>>> from
> >>>>>>>> only one of the streams - *S1*, we will pause one of the topic
> >>>>>> partitions
> >>>>>>>> *TP2* and call *poll()* on the consumer to only retrieve the
> records
> >>>>>> for
> >>>>>>>> the demanded topic partition - *TP1*. The result of that is all
> the
> >>>>>>>> records
> >>>>>>>> that have been prefetched for *TP2* are now thrown away by the
> >> fetcher
> >>>>>>>> ("*Not
> >>>>>>>> returning fetched records for assigned partition TP2 since it is
> no
> >>>>>>>> longer
> >>>>>>>> fetchable"*). If we extrapolate that to multiple streams sharing
> the
> >>>>>> same
> >>>>>>>> consumer, we might quickly end up in a situation where we throw
> >>>>>>>> prefetched
> >>>>>>>> data quite often. This does not seem like the most efficient
> >> approach
> >>>>>> and
> >>>>>>>> in fact produces quite a lot of overlapping fetch requests as
> >>>>>> illustrated
> >>>>>>>> in the following issue:
> >>>>>>>>
> >>>>>>>> https://github.com/akka/alpakka-kafka/issues/549
> >>>>>>>>
> >>>>>>>> I am writing this email to get some initial opinion on a KIP I was
> >>>>>>>> thinking
> >>>>>>>> about. What if we give the clients of the Consumer API a bit more
> >>>>>> control
> >>>>>>>> of what to do with this prefetched data. Two options I am
> wondering
> >>>>>>>> about:
> >>>>>>>>
> >>>>>>>> 1. Introduce a configuration setting, such as*
> >>>>>>>> "return-prefetched-data-for-paused-topic-partitions = false"*
> (have
> >> to
> >>>>>>>> think of a better name), which when set to true will return what
> is
> >>>>>>>> prefetched instead of throwing it away on calling *poll()*. Since
> >> this
> >>>>>> is
> >>>>>>>> amount of data that is bounded by the maximum size of the
> prefetch,
> >> we
> >>>>>>>> can
> >>>>>>>> control what is the most amount of records returned. The client of
> >> the
> >>>>>>>> consumer API can then be responsible for keeping that data around
> >> and
> >>>>>> use
> >>>>>>>> it when appropriate (i.e. when demand is present)
> >>>>>>>>
> >>>>>>>> 2. Introduce a facility to pass in a buffer into which the
> >> prefetched
> >>>>>>>> records are drained when poll is called and paused partitions have
> >> some
> >>>>>>>> prefetched records.
> >>>>>>>>
> >>>>>>>> Any opinions on the matter are welcome. Thanks a lot !
> >>>>>>>>
> >>>>>>>> Zahari Dichev
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> >>
> >
>
>

Reply via email to