> You have one consumer that is quite
>> slow so lets say it call poll every 5 seconds, while you need to call poll
>> every 1 second to issue a heartbeat (these are made up numbers of course).

Heartbeats are send by a backgroud thread since 0.10.0.0. And there is a
second configuration `max.poll.interval.ms` that you can increase for a
slow consumer. Kafka Streams sets it to `Integer.MAX_VALUE` for example.

For the remainder, I agree, that using a buffer on top might not a
perfect solution. That is, what I meant by:

> (not 100% percent guarantee,
> depending on fetch size and max.poll.record etc, but with high
> probability)

However, I believe, that if you configure the consumer accordingly, you
can to drain the fetch buffer if you block on `poll()` forever.

I have to admit, than I am not familiar with the details of pipelining
fetch requests thought. The general idea is, to make sure to drain the
internal buffer of the consumer, before you call `pause()`.

Curious to to learn why this would not work? How does the pipelining of
fetch requests works in detail?


-Matthias


On 10/19/18 1:36 PM, Zahari Dichev wrote:
> Hi there Matthias,
> 
> Very useful thoughts indeed. I have considered the exact same approach but
> what worries me a bit is that I do not think that will certainly solve the
> issue. Imagine the following situation. You have one consumer that is quite
> slow so lets say it call poll every 5 seconds, while you need to call poll
> every 1 second to issue a heartbeat (these are made up numbers of course).
> 
> So our consumer calls poll at T0 grabs some data and puts it in a buffer
> and calls pause on the topic partition. The fetcher tries to pipeline and
> issues a fetch request and at some point the data arrives. At that point we
> have some data in the buffer and we can do whatever we want with it, but
> there is also some data living within the consumer/fetcher. Approximately
> one second later we call poll again because we need to. We are not getting
> any data because our partition is paused and this is good because if we got
> data we would not know what to do with it as our client is still busy
> crunching the data from the first poll. So far so good. What happens though
> is that the pre-fetched data gets thrown away upon calling poll as its "no
> longer fetchable...".  Maybe I am not fully understanding your suggested
> approach, but I dont think it would solve this problem.
> 
> Zahari
> 
> On Fri, Oct 19, 2018 at 8:10 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Just my 2 cents.
>>
>> I am not 100% sure if we would need to change the consumer for this.
>> While I still think, that KIP-349 might be valuable, it seems to be
>> complementary/orthogonal to the issue discussed here.
>>
>> For Kafka Streams, we have a related scenario and what Kafka Streams
>> does is, to add its own buffer on top of the consumer. Thus, for each
>> `poll()` all data is put into this buffer, and now Streams can decide
>> which record to process first. For buffers that have data, we can call
>> `pause()` without loosing fetched data (not 100% percent guarantee,
>> depending on fetch size and max.poll.record etc, but with high
>> probability) and if a buffer gets empty we `resume()` partitions.
>>
>> As Akka Streams builds on top of the consumer it could implement a
>> similar pattern. Of course, one cannot use `auto.commit` on the
>> consumer, but commits need to be manged manually, (ie, only data that
>> was take out of the buffer and actually was processed can be committed).
>>
>> For the `MessageChooser` idea, I also still think it might be useful,
>> but it's unclear to me if this should be a consumer feature or build on
>> top of the consumer (maybe it could be a Streams feature, as Streams is
>> build on top of the consumer). Thoughts?
>>
>>
>> -Matthias
>>
>> On 10/18/18 9:27 AM, Jan Filipiak wrote:
>>> The idea for you would be that Messagechooser could hang on to the
>>> prefetched messages.
>>>
>>> ccing cmcc...@apache.org
>>>
>>> @Collin
>>> just for you to see that MessageChooser is a powerfull abstraction.
>>>
>>> :)
>>>
>>> Best jan
>>>
>>> On 18.10.2018 13:59, Zahari Dichev wrote:
>>>> Jan,
>>>>
>>>> Quite insightful indeed. I think your propositions are valid.
>>>>
>>>> Ryanne,
>>>>
>>>> I understand that consumers are using a pull model... And yes, indeed
>> if a
>>>> consumer is not ready for more records it surely should not call poll.
>>>> Except that it needs to do so periodically in order to indicate that its
>>>> live. Forget about the "backpressure", I guess I was wrong with phrasing
>>>> this so lets not get caught up on it.
>>>>
>>>> You say pause/resume can be used to prioritise certain topics/partitions
>>>> over others. And indeed this is the case. So instead of thinking about
>> it
>>>> in terms of backpressure, lets put it in a different way. The Akka
>> streams
>>>> connector would like to prioritise certain topics over others, using
>> once
>>>> consumer instance. On top of that, add the detail that the priorities
>>>> change quite frequently (which translates to calling pause/resume
>>>> frequently). So all that being said, what would be a proper way to
>> handle
>>>> the situation without throwing the pre-fetched records away when calling
>>>> poll on a consumer that happens to have a topic that was recently paused
>>>> (and that might be un-paused soon )? Am I the only one who considers
>> that
>>>> an actual problem with the use os pause/resume ? Not sure how to explain
>>>> the situation in a better way..
>>>>
>>>> Zahari
>>>>
>>>>
>>>> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks a lot Jan,
>>>>>
>>>>> I will read it.
>>>>>
>>>>> Zahari
>>>>>
>>>>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <jan.filip...@trivago.com
>>>
>>>>> wrote:
>>>>>
>>>>>> especially my suggestions ;)
>>>>>>
>>>>>> On 18.10.2018 08:30, Jan Filipiak wrote:
>>>>>>> Hi Zahari,
>>>>>>>
>>>>>>> would you be willing to scan through the KIP-349 discussion a little?
>>>>>>> I think it has suggestions that could be interesting for you
>>>>>>>
>>>>>>> Best Jan
>>>>>>>
>>>>>>> On 16.10.2018 09:29, Zahari Dichev wrote:
>>>>>>>> Hi there Kafka developers,
>>>>>>>>
>>>>>>>> I am currently trying to find a solution to an issue that has been
>>>>>>>> manifesting itself in the Akka streams implementation of the Kafka
>>>>>>>> connector. When it comes to consuming messages, the implementation
>>>>>> relies
>>>>>>>> heavily on the fact that we can pause and resume partitions. In some
>>>>>>>> situations when a single consumer instance is shared among several
>>>>>>>> streams,
>>>>>>>> we might end up with frequently pausing and unpausing a set of topic
>>>>>>>> partitions, which is the main facility that allows us to implement
>> back
>>>>>>>> pressure. This however has certain disadvantages, especially when
>>>>>>>> there are
>>>>>>>> two consumers that differ in terms of processing speed.
>>>>>>>>
>>>>>>>> To articulate the issue more clearly, imagine that a consumer
>> maintains
>>>>>>>> assignments for two topic partitions *TP1* and *TP2*. This consumer
>> is
>>>>>>>> shared by two streams - S1 and S2. So effectively when we have
>> demand
>>>>>>>> from
>>>>>>>> only one of the streams - *S1*, we will pause one of the topic
>>>>>> partitions
>>>>>>>> *TP2* and call *poll()* on the consumer to only retrieve the records
>>>>>> for
>>>>>>>> the demanded topic partition - *TP1*. The result of that is all the
>>>>>>>> records
>>>>>>>> that have been prefetched for *TP2* are now thrown away by the
>> fetcher
>>>>>>>> ("*Not
>>>>>>>> returning fetched records for assigned partition TP2 since it is no
>>>>>>>> longer
>>>>>>>> fetchable"*). If we extrapolate that to multiple streams sharing the
>>>>>> same
>>>>>>>> consumer, we might quickly end up in a situation where we throw
>>>>>>>> prefetched
>>>>>>>> data quite often. This does not seem like the most efficient
>> approach
>>>>>> and
>>>>>>>> in fact produces quite a lot of overlapping fetch requests as
>>>>>> illustrated
>>>>>>>> in the following issue:
>>>>>>>>
>>>>>>>> https://github.com/akka/alpakka-kafka/issues/549
>>>>>>>>
>>>>>>>> I am writing this email to get some initial opinion on a KIP I was
>>>>>>>> thinking
>>>>>>>> about. What if we give the clients of the Consumer API a bit more
>>>>>> control
>>>>>>>> of what to do with this prefetched data. Two options I am wondering
>>>>>>>> about:
>>>>>>>>
>>>>>>>> 1. Introduce a configuration setting, such as*
>>>>>>>> "return-prefetched-data-for-paused-topic-partitions = false"* (have
>> to
>>>>>>>> think of a better name), which when set to true will return what is
>>>>>>>> prefetched instead of throwing it away on calling *poll()*. Since
>> this
>>>>>> is
>>>>>>>> amount of data that is bounded by the maximum size of the prefetch,
>> we
>>>>>>>> can
>>>>>>>> control what is the most amount of records returned. The client of
>> the
>>>>>>>> consumer API can then be responsible for keeping that data around
>> and
>>>>>> use
>>>>>>>> it when appropriate (i.e. when demand is present)
>>>>>>>>
>>>>>>>> 2. Introduce a facility to pass in a buffer into which the
>> prefetched
>>>>>>>> records are drained when poll is called and paused partitions have
>> some
>>>>>>>> prefetched records.
>>>>>>>>
>>>>>>>> Any opinions on the matter are welcome. Thanks a lot !
>>>>>>>>
>>>>>>>> Zahari Dichev
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to