Hi all, Batching is integral part of kafka for better TPS, especially with transactional design and its expensive commit API. Idea is to send multiple records per transaction.
Below is the working code for single record per transaction: ```java from("file:input?fileName=input.txt&noop=true") .split(body().tokenize("\n")).streaming() .to("kafka:topic2?brokers=<broker-ip>:31161" + "&requestRequiredAcks=all" + "&lingerMs=10" + //"&synchronous=true" + //commented "&additionalProperties.enable.idempotence=true" + "&additionalProperties.transactional.id=newtxn-53" + "&additionalProperties.retries=5"); ``` Above logic is a single exchange per transaction, leading to very low TPS. As per camel-kafka doc, aggrgate() can be used to increase producer performance. But aggregate() doesn't work for kafka transactional endpoint in camel-kafka. Doc: https://camel.apache.org/components/4.10.x/kafka-component.html Non-working code with aggregate() : ```java from("file:input?fileName=input.txt&noop=true") .split(body().tokenize("\n")).streaming() //.delay(80) .aggregate(constant(true), new GroupedExchangeAggregationStrategy()) .completionSize(2) .completionInterval(100) .to("kafka:topic2?brokers=<broker-ip>:31161" + "&requestRequiredAcks=all" + "&lingerMs=10" + //"&synchronous=true" + "&additionalProperties.enable.idempotence=true" + "&additionalProperties.transactional.id=newtxn-10" + "&additionalProperties.retries=5"); ``` **Exception:** 10] ProducerId set to 1006 with epoch 0 [com.example.FileToKafkaApp.main()] INFO org.apache.camel.impl.engine.AbstractCamelContext - Routes startup (started:1) [com.example.FileToKafkaApp.main()] INFO org.apache.camel.impl.engine.AbstractCamelContext - Started route1 ([file://input](file://input/)) [com.example.FileToKafkaApp.main()] INFO org.apache.camel.impl.engine.AbstractCamelContext - Apache Camel 3.21.0 (camel-1) started in 1s523ms (build:63ms init:322ms start:1s138ms) Camel started. Press Ctrl+C to stop. After aggregation - Body type: org.apache.camel.processor.aggregate.AbstractListAggregationStrategy$GroupedExchangeList After aggregation - Body content: List<Exchange>(2 elements) After aggregation - Body type: org.apache.camel.processor.aggregate.AbstractListAggregationStrategy$GroupedExchangeList After aggregation - Body content: List<Exchange>(2 elements) **Exception occurred: TransactionalId newtxn-10: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION** Debug: After some debugging, I found that adding some delay (commented in above route) makes the above route work seamlessly. So what may be happening is: aggregate() route thread is providing next batch to kafka internal producer thread(s) faster than it takes for kafka producer thread(s) to be done with existing batch. Requirement: Any idea if it's an unexpected behaviour and we should be able to batch kafka transaction in camel with this way or another, without writing custom code or processor? Disclaimer: "Information contained and transmitted by this E-MAIL including any attachment is proprietary to mCarbon Tech Innovation Private Limited and is intended solely for the addressee/s, and may contain information that is privileged, confidential or exempt from disclosure under applicable law. Access to this e-mail and/or to the attachment by anyone else is unauthorised. If this is a forwarded message, the content and the views expressed in this E-MAIL may not reflect those of the organisation. If you are not the intended recipient, an agent of the intended recipient or a person responsible for delivering the information to the named recipient, you are notified that any use, distribution, transmission, printing, copying or dissemination of this information in any way or in any manner is strictly prohibited. If you are not the intended recipient of this mail kindly delete from your system and inform the sender. There is no guarantee that the integrity of this communication has been maintained and nor is this communication free of viruses, interceptions or interference."