Kashyap Ivaturi created KAFKA-7365: -------------------------------------- Summary: max.poll.records setting in Kafka Consumer is not working Key: KAFKA-7365 URL: https://issues.apache.org/jira/browse/KAFKA-7365 Project: Kafka Issue Type: Bug Components: consumer Reporter: Kashyap Ivaturi
Hi, I have a requirement where I consume messages one by one, each message has additional processing that I should do and then manually commit the offset. Things work well most of the times until I get a big bunch of records which takes longer time to process and I encounter CommitFailed exception for the last set of records even though they were processed. While i'am able to reconnect back its picking some messages that I had already processed. I don't want this to happen as its creating duplicates in target systems that I integrate with while processing the message. I decided that even though there are more messages in the queue , I would like to have a control on how many records I can process when polled. I tried to replicate a scenario where I have started the consumer by setting 'max.poll.records' to '1' and then pushed 4 messages into the Topic the consumer is listening. I expected that the consumer will only process 1 message because of my 'max.poll.records' setting but the consumer has processed all the 4 messages in single poll. Any idea why did it not consider 'max.poll.records' setting or is some other setting overriding this setting?. Appreciate your help or guidance in troubleshooting this issue. Here is the log of my Consumer config when it starts: 2018-08-28 08:29:47.873 INFO 91121 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 auto.offset.reset = earliest bootstrap.servers = [messaging-rtp3.cisco.com:9093] check.crcs = true [client.id|https://client.id/] = [connections.max.idle.ms|https://connections.max.idle.ms/] = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500 fetch.min.bytes = 1 [group.id|https://group.id/] = empestor [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 [max.poll.interval.ms|https://max.poll.interval.ms/] = 300000 max.poll.records = 1 [metadata.max.age.ms|https://metadata.max.age.ms/] = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 30000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000 [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50 [request.timeout.ms|https://request.timeout.ms/] = 40000 [retry.backoff.ms|https://retry.backoff.ms/] = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 [session.timeout.ms|https://session.timeout.ms/] = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = [hidden] ssl.keymanager.algorithm = SunX509 ssl.keystore.location = /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = /kafka/certs/empestor/certificates/kafka.client.truststore.jks ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2018-08-28 08:29:48.079 INFO 91121 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)