3.21.0 is an old version, not supported anymore.

We are now fully focused on 4.x. The LTS release is 4.10.x and the last
released version is 4.10.4.

The suggestion is to test your logic (adapted with the migration guide) to
this release and check the result.



Il giorno gio 29 mag 2025 alle ore 09:49 Piyush Pahwa
<piyush.pa...@mcarbon.com.invalid> ha scritto:

>
> Hi all,
>
> Batching is integral part of kafka for better TPS, especially with
> transactional design and its expensive commit API. Idea is to send multiple
> records per transaction.
>
> Below is the working code for single record per transaction:
>
> ```java
> from("file:input?fileName=input.txt&noop=true")
>             .split(body().tokenize("\n")).streaming()
>             .to("kafka:topic2?brokers=<broker-ip>:31161" +
>             "&requestRequiredAcks=all" +
>             "&lingerMs=10" +
>             //"&synchronous=true" +        //commented
>             "&additionalProperties.enable.idempotence=true" +
>             "&additionalProperties.transactional.id=newtxn-53" +
>             "&additionalProperties.retries=5");
> ```
>
> Above logic is a single exchange per transaction, leading to very low TPS.
>
> As per camel-kafka doc, aggrgate() can be used to increase producer
> performance. But aggregate() doesn't work for kafka transactional endpoint
> in camel-kafka.
>
> Doc: https://camel.apache.org/components/4.10.x/kafka-component.html
>
> Non-working code with aggregate() :
>
> ```java
> from("file:input?fileName=input.txt&noop=true")
>             .split(body().tokenize("\n")).streaming()
>              //.delay(80)
>              .aggregate(constant(true), new
> GroupedExchangeAggregationStrategy())
>                 .completionSize(2)
>                 .completionInterval(100)
>             .to("kafka:topic2?brokers=<broker-ip>:31161" +
>             "&requestRequiredAcks=all" +
>             "&lingerMs=10" +
>             //"&synchronous=true" +
>             "&additionalProperties.enable.idempotence=true" +
>             "&additionalProperties.transactional.id=newtxn-10" +
>             "&additionalProperties.retries=5");
> ```
>
> **Exception:**
> 10] ProducerId set to 1006 with epoch 0
>  [com.example.FileToKafkaApp.main()] INFO
> org.apache.camel.impl.engine.AbstractCamelContext - Routes startup
> (started:1)
>  [com.example.FileToKafkaApp.main()] INFO
> org.apache.camel.impl.engine.AbstractCamelContext -    Started route1
> ([file://input](file://input/))
>  [com.example.FileToKafkaApp.main()] INFO
> org.apache.camel.impl.engine.AbstractCamelContext - Apache Camel 3.21.0
> (camel-1) started in 1s523ms (build:63ms init:322ms start:1s138ms)
>  Camel started. Press Ctrl+C to stop.
>  After aggregation - Body type:
> org.apache.camel.processor.aggregate.AbstractListAggregationStrategy$GroupedExchangeList
>  After aggregation - Body content: List<Exchange>(2 elements)
>  After aggregation - Body type:
> org.apache.camel.processor.aggregate.AbstractListAggregationStrategy$GroupedExchangeList
>  After aggregation - Body content: List<Exchange>(2 elements)
> **Exception occurred: TransactionalId newtxn-10: Invalid transition
> attempted from state IN_TRANSACTION to state IN_TRANSACTION**
>
> Debug:
> After some debugging, I found that adding some delay (commented in above
> route) makes the above route work seamlessly. So what may be happening is:
> aggregate() route thread is providing next batch to kafka internal producer
> thread(s) faster than it takes for kafka producer thread(s) to be done with
> existing batch.
>
> Requirement:
> Any idea if it's an unexpected behaviour and we should be able to batch
> kafka transaction in camel with this way or another, without writing custom
> code or processor?
> Disclaimer: "Information contained and transmitted by this E-MAIL
> including any attachment is proprietary to mCarbon Tech Innovation Private
> Limited and is intended solely for the addressee/s, and may contain
> information that is privileged, confidential or exempt from disclosure
> under applicable law. Access to this e-mail and/or to the attachment by
> anyone else is unauthorised. If this is a forwarded message, the content
> and the views expressed in this E-MAIL may not reflect those of the
> organisation. If you are not the intended recipient, an agent of the
> intended recipient or a person responsible for delivering the information
> to the named recipient, you are notified that any use, distribution,
> transmission, printing, copying or dissemination of this information in any
> way or in any manner is strictly prohibited. If you are not the intended
> recipient of this mail kindly delete from your system and inform the
> sender. There is no guarantee that the integrity of this communication has
> been maintained and nor is this communication free of viruses,
> interceptions or interference."
>

Reply via email to