`max.poll.intervall.ms` is the maximum allowed time between two calls to `poll()`.
Hence, this config seems to be unrelated. For the background heartbeat thread there would be `session.timeout.ms` config but this also seems to be unrelated. What I don't fully understand is, what you try to achieve: > I ultimately want to wait for the buffer to fill up or sit and collect data > continuously for 30-45 mins at a time. What do you exactly mean by this? Do you want `poll()` to block until N messages are available (or return if less message are available but some timeout, ie, 30 minutes hits?) This would not work, because `poll()` has no _lower_ limit on number of messaged to return. What you should do instead is, to call `poll()` just in a loop and buffer all messages in your application and trigger the computation when your reach N messages or the desired timeout. If buffering in your application is undesired, you can also get the current offsets via `consumer.position()` and get the partition end-offsets via `consumer.endOffsets()` and compute how many message are available broker side and start to `poll()` if N or the timeout is reached. -Matthias On 3/25/20 3:17 AM, Steve Tian wrote: > Hi Ryan, > > Have you tried Consumer's pause/resume methods? > > Steve > > On Wed, Mar 25, 2020, 17:13 Kamal Chandraprakash < > kamal.chandraprak...@gmail.com> wrote: > >> With group coordination protocol, you only have to increase the ` >> max.poll.interval.ms` / `max.poll.records`. >> Ignore the above messages. Consumer heartbeats are processed in a separate >> thread. >> >> On Wed, Mar 25, 2020 at 2:35 PM Kamal Chandraprakash < >> kamal.chandraprak...@gmail.com> wrote: >> >>> Yes, with `assign` you'll lose the group coordination. You can still use >>> the `subscribe` mode, update the above mentioned configs. >>> You're ask is kind of Delay Queue. Kafka Consumer doesn't support that >>> feature. You've to manually `sleep` in between the poll calls. >>> >>> On Tue, Mar 24, 2020 at 11:56 PM Ryan Schachte < >> coderyanschac...@gmail.com> >>> wrote: >>> >>>> Don't I lose consumer group coordination with assign? >>>> >>>> On Mon, Mar 23, 2020 at 11:49 PM Kamal Chandraprakash < >>>> kamal.chandraprak...@gmail.com> wrote: >>>> >>>>> Hi Ryan, >>>>> >>>>> The maxPollInterval waits for at-most the given time duration and >>>> returns >>>>> ASAP even if a single record is available. >>>>> If you want to collect data once 30-45 minutes, better to use the >>>> Consumer >>>>> with `assign` mode and poll for records >>>>> once in 30 minutes. >>>>> >>>>> If you're using the consumer with `subscribe` mode, then you have to >>>> update >>>>> the following configs: >>>>> 1. session.timeout.ms >>>>> 2. heartbeat.interval.ms and >>>>> 3. group.max.session.timeout.ms in the broker configs. >>>>> >>>>> Increasing the session timeout will lead to delay in detecting the >>>> consumer >>>>> failures, I would suggest to go with `assign` mode. >>>>> >>>>> >>>>> On Tue, Mar 24, 2020 at 4:45 AM Ryan Schachte < >>>> coderyanschac...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hey guys, I'm getting a bit overwhelmed by the different variables >>>> used >>>>> to >>>>>> help enable batching for me. >>>>>> >>>>>> I have some custom batching logic that processes when either N >> records >>>>> have >>>>>> been buffered or my max timeout has been hit. It was working >> decently >>>>> well, >>>>>> but I hit this error: >>>>>> >>>>>> *This means that the time between subsequent calls to poll() was >>>> longer >>>>>> than the configured max.poll.interval.ms < >> http://max.poll.interval.ms >>>>> , >>>>>> which typically implies that the poll loop is spending too much time >>>>>> message processing.* >>>>>> >>>>>> I ultimately want to wait for the buffer to fill up or sit and >> collect >>>>> data >>>>>> continuously for 30-45 mins at a time. Do I need to do anything with >>>>>> heartbeat or session timeout as well? >>>>>> >>>>>> So now my question is.. Can I just bump my maxPollInterval to >>>> something >>>>>> like: >>>>>> >>>>>> maxPollInterval: '2700000', >>>>>> >>>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature