[ 
https://issues.apache.org/jira/browse/CAMEL-21689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated CAMEL-21689:
--------------------------------
    Issue Type: Task  (was: Bug)

> Apache Camel Kafka Batching Consumer behaviour discrepancy w.r.t pollTimeoutMs
> ------------------------------------------------------------------------------
>
>                 Key: CAMEL-21689
>                 URL: https://issues.apache.org/jira/browse/CAMEL-21689
>             Project: Camel
>          Issue Type: Task
>          Components: camel-kafka
>    Affects Versions: 4.8.3
>            Reporter: Manjunath S Horapeti
>            Priority: Major
>             Fix For: 4.8.4, 4.10.0
>
>         Attachments: image-2025-01-31-01-18-05-681.png, 
> image-2025-01-31-01-25-52-332.png, image-2025-01-31-01-26-53-858.png
>
>
> Hi Team,
>  
> I'm reaching out regarding the understanding of behaviour of "pollTimeoutMs" 
> and the discrepancy observed. 
> !image-2025-01-31-01-18-05-681.png|width=627,height=156!
> As per the documentation above from the camel kafka component page 
> (https://camel.apache.org/components/4.8.x/kafka-component.html#_batching_consumer)
>  4.8.x version, the pollTimeoutMs works in tandem with "maxPollRecords", to 
> either poll "maxPollRecords" or block for a maximum of "pollTimeOutMs". 
> But the behaviour observed was that the camel route kept waiting until 
> "maxPollRecords" count was reached and then processed further.
>  
> For example: our route is as follows
>  
> {code:java}
> from("kafka:topic-name?brokers=brokers" + 
> "&groupId=groupid&pollTimeoutMs=10000" + 
> "&batching=true&maxPollRecords=50000")
> .bean(this, "methodName");{code}
>  
> This route always waits until 50000 records are present in the topic and then 
> proceeds further and ignoring pollTimeoutMs of 10000 (10 seconds). i.e. if 
> the producer is producing messages at a rate of 50-100 msgs per second, then 
> application waits for nearly 500-1000 seconds before proceeding further that 
> is until 50000 record count is met.
>  
> We believe that the below code mentioned in the 
> "KafkaRecordBatchingProcessor" is never executed as there is always one or 
> two messages in poll. and hence method - "hasExpiredRecords" with condition 
> consumerRecords.isEmpty() is always false.
> We believe this is making the application wait until maxPollRecords (50000) 
> is reached and then proceed further.
> !image-2025-01-31-01-25-52-332.png|width=640,height=257!
> Definition of "hasExpiredRecords()"
> !image-2025-01-31-01-26-53-858.png|width=653,height=53!
> Can you please help us by letting us know if the above behaviour is as 
> expected and if so then can you let us know how to pull Y messages from the 
> topic or write whatever messages are received within X seconds? (exact 
> behaviour mentioned in the above Batching Consumer documentation). 
>  
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to