Hi all,

Batching is integral part of kafka for better TPS, especially with 
transactional design and its expensive commit API. Idea is to send multiple 
records per transaction.

Below is the working code for single record per transaction:

```java
from("file:input?fileName=input.txt&noop=true")
            .split(body().tokenize("\n")).streaming()
            .to("kafka:topic2?brokers=<broker-ip>:31161" +
            "&requestRequiredAcks=all" +
            "&lingerMs=10" +
            //"&synchronous=true" +        //commented
            "&additionalProperties.enable.idempotence=true" +
            "&additionalProperties.transactional.id=newtxn-53" +
            "&additionalProperties.retries=5");
```

Above logic is a single exchange per transaction, leading to very low TPS.

As per camel-kafka doc, aggrgate() can be used to increase producer 
performance. But aggregate() doesn't work for kafka transactional endpoint in 
camel-kafka.

Doc: https://camel.apache.org/components/4.10.x/kafka-component.html

Non-working code with aggregate() :

```java
from("file:input?fileName=input.txt&noop=true")
            .split(body().tokenize("\n")).streaming()
             //.delay(80)
             .aggregate(constant(true), new 
GroupedExchangeAggregationStrategy())
                .completionSize(2)
                .completionInterval(100)
            .to("kafka:topic2?brokers=<broker-ip>:31161" +
            "&requestRequiredAcks=all" +
            "&lingerMs=10" +
            //"&synchronous=true" +
            "&additionalProperties.enable.idempotence=true" +
            "&additionalProperties.transactional.id=newtxn-10" +
            "&additionalProperties.retries=5");
```

**Exception:**
10] ProducerId set to 1006 with epoch 0
 [com.example.FileToKafkaApp.main()] INFO 
org.apache.camel.impl.engine.AbstractCamelContext - Routes startup (started:1)
 [com.example.FileToKafkaApp.main()] INFO 
org.apache.camel.impl.engine.AbstractCamelContext -    Started route1 
([file://input](file://input/))
 [com.example.FileToKafkaApp.main()] INFO 
org.apache.camel.impl.engine.AbstractCamelContext - Apache Camel 3.21.0 
(camel-1) started in 1s523ms (build:63ms init:322ms start:1s138ms)
 Camel started. Press Ctrl+C to stop.
 After aggregation - Body type: 
org.apache.camel.processor.aggregate.AbstractListAggregationStrategy$GroupedExchangeList
 After aggregation - Body content: List<Exchange>(2 elements)
 After aggregation - Body type: 
org.apache.camel.processor.aggregate.AbstractListAggregationStrategy$GroupedExchangeList
 After aggregation - Body content: List<Exchange>(2 elements)
**Exception occurred: TransactionalId newtxn-10: Invalid transition attempted 
from state IN_TRANSACTION to state IN_TRANSACTION**

Debug:
After some debugging, I found that adding some delay (commented in above route) 
makes the above route work seamlessly. So what may be happening is: aggregate() 
route thread is providing next batch to kafka internal producer thread(s) 
faster than it takes for kafka producer thread(s) to be done with existing 
batch.

Requirement:
Any idea if it's an unexpected behaviour and we should be able to batch kafka 
transaction in camel with this way or another, without writing custom code or 
processor?
Disclaimer: "Information contained and transmitted by this E-MAIL including any 
attachment is proprietary to mCarbon Tech Innovation Private Limited and is 
intended solely for the addressee/s, and may contain information that is 
privileged, confidential or exempt from disclosure under applicable law. Access 
to this e-mail and/or to the attachment by anyone else is unauthorised. If this 
is a forwarded message, the content and the views expressed in this E-MAIL may 
not reflect those of the organisation. If you are not the intended recipient, 
an agent of the intended recipient or a person responsible for delivering the 
information to the named recipient, you are notified that any use, 
distribution, transmission, printing, copying or dissemination of this 
information in any way or in any manner is strictly prohibited. If you are not 
the intended recipient of this mail kindly delete from your system and inform 
the sender. There is no guarantee that the integrity of this communication has 
been maintained and nor is this communication free of viruses, interceptions or 
interference."

Reply via email to