Shantanu, If session timeout is 10 minutes, it would mean that Kafka will not notice that a previous consumer has died until 10 minutes has elapsed. So if you kill a consumer and start a new one, it will not start processing for 10 minutes!
I would not change session timeout from the default. Reducing max.poll.records has worked for me in the past for slow-moving streams, but here is another possibility. If it is taking a long time for your consumer to process records, it can fall so far behind that records have expired by the time the consumer commits them. In other words, your consumer can be chasing the tail-end of the stream, but never quite keep pace. I have seen consumers processing 10K records/s drop to 100 records/s when this happens. Obviously, this exacerbates the problem, and the consumer never recovers. Notably, this can happen in some partitions and not others, which results in behavior like you are seeing. If this might be the case, I suggest adding more parallelism to your consumer. Try processing each record asynchronously or in parallel. Just be careful that you either a) finish all tasks before the next poll(), or b) turn off auto-commit and do your own checkpointing. In the former case, you can keep max.poll.records high (say 50) and process all (up to 50) records in parallel. Then wait for all tasks to finish before calling poll() again. To increase parallelism without compromising per-partition order semantics, I have used a partition-aware thread pool s.t. tasks in the same partition run in sequence, but tasks from different partitions run in parallel. Basically the consumer has one task queue per partition. Then you can increase the partition count to add parallelism as needed. Ryanne On Fri, Aug 31, 2018 at 1:28 AM Shantanu Deshmukh <shantanu...@gmail.com> wrote: > I have noticed a very strange behaviour in case of session.timeout.ms > setting > There are 2 topics for which message processing takes long time. So I had > increased session time out there to 5 mins. max.poll.records was kept at > 10. Consumers for these topics would start consuming after 5-10 minutes. I > reset session.timeout.ms to default value and now consumers subscribe and > start consuming immediately. Also rebalances have also reduced. > > Now what is this? When rebalance occurs message in log is reads that you > need to increase session.timeout.ms or reduce max.poll.ms. Now if I > increase session.timeout.ms to any value above default consumers start > very > slow. Has anyone seen such behaviour or explain me why this is hapening? > > On Wed, Aug 29, 2018 at 12:04 PM Shantanu Deshmukh <shantanu...@gmail.com> > wrote: > > > Hi Ryanne, > > > > Thanks for your response. I had even tried with 5 records and session > > timeout as big as 10 minutes. Logs still showed that consumer group > > rebalanced many times. > > Also there is another mystery, some CGs take upto 10 minutes to subscribe > > to topic and start consumption. Why might that be happening, any idea? > > > > On Tue, Aug 28, 2018 at 8:44 PM Ryanne Dolan <ryannedo...@gmail.com> > > wrote: > > > >> Shantanu, > >> > >> Sounds like your consumers are processing too many records between > >> poll()s. > >> Notice that max.poll.records is 50. If your consumer is taking up to > 200ms > >> to process each record, then you'd see up to 10 seconds between poll()s. > >> > >> If a consumer doesn't call poll() frequently enough, Kafka will consider > >> the consumer to be dead and will rebalance away from it. Since all your > >> consumers are in this state, your consumer group is constantly > >> rebalancing. > >> > >> Fix is easy: reduce max.poll.records. > >> > >> Ryanne > >> > >> On Tue, Aug 28, 2018 at 6:34 AM Shantanu Deshmukh < > shantanu...@gmail.com> > >> wrote: > >> > >> > Someone, please help me. Only 1 or 2 out of 7 consumer groups keep > >> > rebalancing every 5-10mins. One topic is constantly receiving 10-20 > >> > msg/sec. The other one receives a bulk load after many hours of > >> inactivity. > >> > CGs for both these topics are different. So, I see no observable > pattern > >> > here. > >> > > >> > On Wed, Aug 22, 2018 at 5:47 PM Shantanu Deshmukh < > >> shantanu...@gmail.com> > >> > wrote: > >> > > >> > > I know average time of processing one record, it is about 70-80ms. I > >> have > >> > > set session.timeout.ms so high total processing time for one poll > >> > > invocation should be well within it. > >> > > > >> > > On Wed, Aug 22, 2018 at 5:04 PM Steve Tian <steve.cs.t...@gmail.com > > > >> > > wrote: > >> > > > >> > >> Have you measured the duration between two `poll` invocations and > the > >> > size > >> > >> of returned `ConsumrRecords`? > >> > >> > >> > >> On Wed, Aug 22, 2018, 7:00 PM Shantanu Deshmukh < > >> shantanu...@gmail.com> > >> > >> wrote: > >> > >> > >> > >> > Ohh sorry, my bad. Kafka version is 0.10.1.0 indeed and so is the > >> > >> client. > >> > >> > > >> > >> > On Wed, Aug 22, 2018 at 4:26 PM Steve Tian < > >> steve.cs.t...@gmail.com> > >> > >> > wrote: > >> > >> > > >> > >> > > NVM. What's your client version? I'm asking as > >> > max.poll.interval.ms > >> > >> > > should be introduced since 0.10.1.0, which is not the version > you > >> > >> > mentioned > >> > >> > > in the email thread. > >> > >> > > > >> > >> > > On Wed, Aug 22, 2018, 6:51 PM Shantanu Deshmukh < > >> > >> shantanu...@gmail.com> > >> > >> > > wrote: > >> > >> > > > >> > >> > > > How do I check for GC pausing? > >> > >> > > > > >> > >> > > > On Wed, Aug 22, 2018 at 4:12 PM Steve Tian < > >> > steve.cs.t...@gmail.com > >> > >> > > >> > >> > > > wrote: > >> > >> > > > > >> > >> > > > > Did you observed any GC-pausing? > >> > >> > > > > > >> > >> > > > > On Wed, Aug 22, 2018, 6:38 PM Shantanu Deshmukh < > >> > >> > shantanu...@gmail.com > >> > >> > > > > >> > >> > > > > wrote: > >> > >> > > > > > >> > >> > > > > > Hi Steve, > >> > >> > > > > > > >> > >> > > > > > Application is just sending mails. Every record is just a > >> > email > >> > >> > > request > >> > >> > > > > > with very basic business logic. Generally it doesn't take > >> more > >> > >> than > >> > >> > > > 200ms > >> > >> > > > > > to process a single mail. Currently it is averaging out > at > >> > 70-80 > >> > >> > ms. > >> > >> > > > > > > >> > >> > > > > > On Wed, Aug 22, 2018 at 3:06 PM Steve Tian < > >> > >> > steve.cs.t...@gmail.com> > >> > >> > > > > > wrote: > >> > >> > > > > > > >> > >> > > > > > > How long did it take to process 50 `ConsumerRecord`s? > >> > >> > > > > > > > >> > >> > > > > > > On Wed, Aug 22, 2018, 5:16 PM Shantanu Deshmukh < > >> > >> > > > shantanu...@gmail.com > >> > >> > > > > > > >> > >> > > > > > > wrote: > >> > >> > > > > > > > >> > >> > > > > > > > Hello, > >> > >> > > > > > > > > >> > >> > > > > > > > We have Kafka 0.10.0.1 running on a 3 broker cluster. > >> We > >> > >> have > >> > >> > an > >> > >> > > > > > > > application which consumes from a topic having 10 > >> > >> partitions. > >> > >> > 10 > >> > >> > > > > > > consumers > >> > >> > > > > > > > are spawned from this process, they belong to one > >> consumer > >> > >> > group. > >> > >> > > > > > > > > >> > >> > > > > > > > What we have observed is that very frequently we are > >> > >> observing > >> > >> > > such > >> > >> > > > > > > > messages in consumer logs > >> > >> > > > > > > > > >> > >> > > > > > > > [2018-08-21 11:12:46] :: WARN :: > >> ConsumerCoordinator:554 > >> > - > >> > >> > Auto > >> > >> > > > > offset > >> > >> > > > > > > > commit failed for group otp-email-consumer: Commit > >> cannot > >> > be > >> > >> > > > > completed > >> > >> > > > > > > > since the group has already rebalanced and assigned > the > >> > >> > > partitions > >> > >> > > > to > >> > >> > > > > > > > another member. This means that the time between > >> > subsequent > >> > >> > calls > >> > >> > > > to > >> > >> > > > > > > poll() > >> > >> > > > > > > > was longer than the configured max.poll.interval.ms, > >> > which > >> > >> > > > typically > >> > >> > > > > > > > implies that the poll loop is spending too much time > >> > message > >> > >> > > > > > processing. > >> > >> > > > > > > > You can address this either by increasing the session > >> > >> timeout > >> > >> > or > >> > >> > > by > >> > >> > > > > > > > reducing the maximum size of batches returned in > poll() > >> > with > >> > >> > > > > > > > max.poll.records. > >> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO :: > >> ConsumerCoordinator:333 > >> > - > >> > >> > > > Revoking > >> > >> > > > > > > > previously assigned partitions [otp-email-1, > >> otp-email-0, > >> > >> > > > > otp-email-3, > >> > >> > > > > > > > otp-email-2] for group otp-email-consumer > >> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO :: > >> AbstractCoordinator:381 > >> > - > >> > >> > > > > > (Re-)joining > >> > >> > > > > > > > group otp-email-consumer > >> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO :: > >> AbstractCoordinator:600 > >> > - > >> > >> > > > *Marking > >> > >> > > > > > the > >> > >> > > > > > > > coordinator x.x.x.x:9092 (id: 2147483646 > <(214)%20748-3646> > >> > <(214)%20748-3646> rack: null) dead > >> > >> for > >> > >> > > group > >> > >> > > > > > > > otp-email-consumer* > >> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO :: > >> AbstractCoordinator:600 > >> > - > >> > >> > > > *Marking > >> > >> > > > > > the > >> > >> > > > > > > > coordinator x.x.x.x:9092 (id: 2147483646 > <(214)%20748-3646> > >> > <(214)%20748-3646> rack: null) dead > >> > >> for > >> > >> > > group > >> > >> > > > > > > > otp-email-consumer* > >> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO :: > >> > >> > > > > > > > > >> AbstractCoordinator$GroupCoordinatorResponseHandler:555 - > >> > >> > > > Discovered > >> > >> > > > > > > > coordinator 10.189.179.117:9092 (id: 2147483646 > <(214)%20748-3646> > >> > <(214)%20748-3646> rack: null) > >> > >> > for > >> > >> > > > > group > >> > >> > > > > > > > otp-email-consumer. > >> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO :: > >> AbstractCoordinator:381 > >> > - > >> > >> > > > > > (Re-)joining > >> > >> > > > > > > > group otp-email-consumer > >> > >> > > > > > > > > >> > >> > > > > > > > After this, the group enters rebalancing phase and it > >> > takes > >> > >> > about > >> > >> > > > > 5-10 > >> > >> > > > > > > > minutes to start consuming messages again. > >> > >> > > > > > > > What does this message mean? The actual broker > >> doesn't go > >> > >> down > >> > >> > > as > >> > >> > > > > per > >> > >> > > > > > > our > >> > >> > > > > > > > monitoring tools. So how come it is declared dead? > >> Please > >> > >> > help, I > >> > >> > > > am > >> > >> > > > > > > stuck > >> > >> > > > > > > > on this issue since 2 months now. > >> > >> > > > > > > > > >> > >> > > > > > > > Here's our consumer configuration > >> > >> > > > > > > > auto.commit.interval.ms = 3000 > >> > >> > > > > > > > auto.offset.reset = latest > >> > >> > > > > > > > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, > >> > >> x.x.x.x:9092] > >> > >> > > > > > > > check.crcs = true > >> > >> > > > > > > > client.id = > >> > >> > > > > > > > connections.max.idle.ms = 540000 > >> > >> > > > > > > > enable.auto.commit = true > >> > >> > > > > > > > exclude.internal.topics = true > >> > >> > > > > > > > fetch.max.bytes = 52428800 > >> > >> > > > > > > > fetch.max.wait.ms = 500 > >> > >> > > > > > > > fetch.min.bytes = 1 > >> > >> > > > > > > > group.id = otp-notifications-consumer > >> > >> > > > > > > > heartbeat.interval.ms = 3000 > >> > >> > > > > > > > interceptor.classes = null > >> > >> > > > > > > > key.deserializer = class > >> > >> org.apache.kafka.common.serialization. > >> > >> > > > > > > > StringDeserializer > >> > >> > > > > > > > max.partition.fetch.bytes = 1048576 > >> > >> > > > > > > > max.poll.interval.ms = 300000 > >> > >> > > > > > > > max.poll.records = 50 > >> > >> > > > > > > > metadata.max.age.ms = 300000 > >> > >> > > > > > > > metric.reporters = [] > >> > >> > > > > > > > metrics.num.samples = 2 > >> > >> > > > > > > > metrics.sample.window.ms = 30000 > >> > >> > > > > > > > partition.assignment.strategy = [class > >> > >> > org.apache.kafka.clients. > >> > >> > > > > > > > consumer.RangeAssignor] > >> > >> > > > > > > > receive.buffer.bytes = 65536 > >> > >> > > > > > > > reconnect.backoff.ms = 50 > >> > >> > > > > > > > request.timeout.ms = 305000 > >> > >> > > > > > > > retry.backoff.ms = 100 > >> > >> > > > > > > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > >> > >> > > > > > > > sasl.kerberos.min.time.before.relogin = 60000 > >> > >> > > > > > > > sasl.kerberos.service.name = null > >> > >> > > > > > > > sasl.kerberos.ticket.renew.jitter = 0.05 > >> > >> > > > > > > > sasl.kerberos.ticket.renew.window.factor = 0.8 > >> > >> > > > > > > > sasl.mechanism = GSSAPI > >> > >> > > > > > > > security.protocol = SSL > >> > >> > > > > > > > send.buffer.bytes = 131072 > >> > >> > > > > > > > session.timeout.ms = 300000 > >> > >> > > > > > > > ssl.cipher.suites = null > >> > >> > > > > > > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > >> > >> > > > > > > > ssl.endpoint.identification.algorithm = null > >> > >> > > > > > > > ssl.key.password = null > >> > >> > > > > > > > ssl.keymanager.algorithm = SunX509 > >> > >> > > > > > > > ssl.keystore.location = null > >> > >> > > > > > > > ssl.keystore.password = null > >> > >> > > > > > > > ssl.keystore.type = JKS > >> > >> > > > > > > > ssl.protocol = TLS > >> > >> > > > > > > > ssl.provider = null > >> > >> > > > > > > > ssl.secure.random.implementation = null > >> > >> > > > > > > > ssl.trustmanager.algorithm = PKIX > >> > >> > > > > > > > ssl.truststore.location = /x/x/client.truststore.jks > >> > >> > > > > > > > ssl.truststore.password = [hidden] > >> > >> > > > > > > > ssl.truststore.type = JKS > >> > >> > > > > > > > value.deserializer = class > >> > >> > org.apache.kafka.common.serialization. > >> > >> > > > > > > > StringDeserializer > >> > >> > > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > > > >> > > >> > > >