`max.poll.intervall.ms` is the maximum allowed time between two calls to
`poll()`.

Hence, this config seems to be unrelated. For the background heartbeat
thread there would be `session.timeout.ms` config but this also seems to
be unrelated.

What I don't fully understand is, what you try to achieve:

> I ultimately want to wait for the buffer to fill up or sit and collect data
> continuously for 30-45 mins at a time.

What do you exactly mean by this? Do you want `poll()` to block until N
messages are available (or return if less message are available but some
timeout, ie, 30 minutes hits?)

This would not work, because `poll()` has no _lower_ limit on number of
messaged to return.

What you should do instead is, to call `poll()` just in a loop and
buffer all messages in your application and trigger the computation when
your reach N messages or the desired timeout.

If buffering in your application is undesired, you can also get the
current offsets via `consumer.position()` and get the partition
end-offsets via `consumer.endOffsets()` and compute how many message are
available broker side and start to `poll()` if N or the timeout is reached.


-Matthias


On 3/25/20 3:17 AM, Steve Tian wrote:
> Hi Ryan,
> 
> Have you tried Consumer's pause/resume methods?
> 
> Steve
> 
> On Wed, Mar 25, 2020, 17:13 Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
> 
>> With group coordination protocol, you only have to increase the `
>> max.poll.interval.ms` / `max.poll.records`.
>> Ignore the above messages. Consumer heartbeats are processed in a separate
>> thread.
>>
>> On Wed, Mar 25, 2020 at 2:35 PM Kamal Chandraprakash <
>> kamal.chandraprak...@gmail.com> wrote:
>>
>>> Yes, with `assign` you'll lose the group coordination. You can still use
>>> the `subscribe` mode, update the above mentioned configs.
>>> You're ask is kind of Delay Queue. Kafka Consumer doesn't support that
>>> feature. You've to manually `sleep` in between the poll calls.
>>>
>>> On Tue, Mar 24, 2020 at 11:56 PM Ryan Schachte <
>> coderyanschac...@gmail.com>
>>> wrote:
>>>
>>>> Don't I lose consumer group coordination with assign?
>>>>
>>>> On Mon, Mar 23, 2020 at 11:49 PM Kamal Chandraprakash <
>>>> kamal.chandraprak...@gmail.com> wrote:
>>>>
>>>>> Hi Ryan,
>>>>>
>>>>> The maxPollInterval waits for at-most the given time duration and
>>>> returns
>>>>> ASAP even if a single record is available.
>>>>> If you want to collect data once 30-45 minutes,  better to use the
>>>> Consumer
>>>>> with `assign` mode and poll for records
>>>>> once in 30 minutes.
>>>>>
>>>>> If you're using the consumer with `subscribe` mode, then you have to
>>>> update
>>>>> the following configs:
>>>>> 1. session.timeout.ms
>>>>> 2. heartbeat.interval.ms and
>>>>> 3. group.max.session.timeout.ms in the broker configs.
>>>>>
>>>>> Increasing the session timeout will lead to delay in detecting the
>>>> consumer
>>>>> failures, I would suggest to go with `assign` mode.
>>>>>
>>>>>
>>>>> On Tue, Mar 24, 2020 at 4:45 AM Ryan Schachte <
>>>> coderyanschac...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey guys, I'm getting a bit overwhelmed by the different variables
>>>> used
>>>>> to
>>>>>> help enable batching for me.
>>>>>>
>>>>>> I have some custom batching logic that processes when either N
>> records
>>>>> have
>>>>>> been buffered or my max timeout has been hit. It was working
>> decently
>>>>> well,
>>>>>> but I hit this error:
>>>>>>
>>>>>> *This means that the time between subsequent calls to poll() was
>>>> longer
>>>>>> than the configured max.poll.interval.ms <
>> http://max.poll.interval.ms
>>>>> ,
>>>>>> which typically implies that the poll loop is spending too much time
>>>>>> message processing.*
>>>>>>
>>>>>> I ultimately want to wait for the buffer to fill up or sit and
>> collect
>>>>> data
>>>>>> continuously for 30-45 mins at a time. Do I need to do anything with
>>>>>> heartbeat or session timeout as well?
>>>>>>
>>>>>> So now my question is.. Can I just bump my maxPollInterval to
>>>> something
>>>>>> like:
>>>>>>
>>>>>> maxPollInterval: '2700000',
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to