[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 12:06 PM:
-----------------------------------------------------------

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

 

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=20000 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{Also, please check the description of CommitFailedException which might help 
you understand this issue.}}

_{{Commit cannot be completed since the group has already rebalanced and 
assigned the partitions to another member. This means that the time between 
subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
which typically implies that the poll loop is spending too much time message 
processing. You can address this either by increasing the session timeout or by 
reducing the maximum size of batches returned in poll() with 
max.poll.records.}}_

 


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

 

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=20000 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{Also, please check the description of CommitFailedException which might help 
you understand this issue.}}

_{{Commit cannot be completed since the group has already }}{{rebalanced and 
assigned the partitions to another member. This means that the time }}{{between 
subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
}}{{which typically implies that the poll loop is spending too much time 
message processing. }}{{You can address this either by increasing the session 
timeout or by reducing the maximum }}{{size of batches returned in poll() with 
max.poll.records.}}_

 

> max.poll.records setting in Kafka Consumer is not working
> ---------------------------------------------------------
>
>                 Key: KAFKA-7365
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7365
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Kashyap Ivaturi
>            Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 540000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 300000
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 30000
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000
> [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50
> [request.timeout.ms|https://request.timeout.ms/] = 40000
> [retry.backoff.ms|https://retry.backoff.ms/] = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> [session.timeout.ms|https://session.timeout.ms/] = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = [hidden]
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = 
> /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks
> ssl.keystore.password = [hidden]
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = 
> /kafka/certs/empestor/certificates/kafka.client.truststore.jks
> ssl.truststore.password = [hidden]
> ssl.truststore.type = JKS
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
>  
> 2018-08-28 08:29:48.079  INFO 91121 --- [           main] 
> o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to