Hi Otavio, Camel version is 3.17.0 thank you
On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <[email protected]> wrote: > Hi Ivan, > > For Kafka (consumer) objects, yes, they should be created for every route. > I want to try to take a look at it during this week or in the next. > > In the meantime, can you please tell me which version of Camel you are > using? > > Kind regards > > On Wed, Oct 26, 2022 at 6:50 PM Ivan <[email protected]> wrote: > > > Hi Oktavio thank you for your response. > > > > The commit is made synchronously by the kafkaOffsetProcessor (whose code > I > > forgot to attach, more details here > > > https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception > ). > > Basically it does what the documentation says for synchronous commits. > > > > Multiple threads cannot access the same kafka client that’s it, but camel > > handles instances and threads, so not so easy to fix for me. > > > > I think about seda because I know that for seda endpoint object are > pooled > > (if my understanding is right). > > > > Are you sure that new objects are created for every route created from a > > template ? > > > > Ivan Rododendro > > > > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <[email protected]> a > > écrit : > > > > > > Hi, > > > > > > From the code you provided, it's not very clear to me when and where > you > > > are calling the commit. Also it's not very clear to me: which version > of > > > Camel you are using and which kind of commit factory you are using > (async > > > [1] or sync [2]?). > > > > > > That said ...The problem here is that - as explained in the exception > > > message - the Kafka client cannot be accessed from a different thread. > > > > > > So, I am not entirely sure that the problem is related to seda or > > something > > > like that. Also, Camel will indeed, create a different consumer for > every > > > route. > > > > > > Please, can you provide a bit more details about the code you have? > > > > > > 1. > > > > > > https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53 > > > 2. > > > > > > https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53 > > > > > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro < > > [email protected]> > > >> wrote: > > >> > > >> Hello > > >> I'm really new to Camel concepts, our need is to create some identical > > >> routes, identical except for some parameters, from a Kafka topic to a > > http > > >> endpoint, with some processing in-between. > > >> > > >> Besides this we want to explicitly commit the message consumption only > > when > > >> the http endpoint has been successfully called. > > >> > > >> In order to achieve this we set up a route template that carries the > > Route > > >> parameterization and set it up to manually commit after having called > > the > > >> http endpoint : > > >> public void configure() throws Exception { > > >> // @formatter:off > > >> routeTemplate(Constantes.KAFKA_GENERIC_ROUTE) > > >> .templateParameter(Constantes.JOB_NAME) > > >> .templateParameter(Constantes.TOPIC) > > >> .templateParameter(Constantes.PUBLISHER_ID) > > >> .templateParameter(Constantes.CORRELATION_ID_PARAMETER) > > >> .templateParameter(Constantes.JOB_NAME_PARAMETER) > > >> .templateParameter(Constantes.CORRELATION_ID_PARAMETER) > > >> .from(getKafkaEndpoint()) > > >> .messageHistory() > > >> .filter(simple("${header.publisherId} == > '{{publisherId}}'")) > > >> .process(messageLoggerProcessor) > > >> .process(modelMapperProcessor) > > >> .process(jsonlToArrayProcessor) > > >> .process(payloadProcessor) > > >> > > >> > .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount) > > >> .setHeader(Exchange.HTTP_METHOD, simple("POST")) > > >> .setHeader(Exchange.CONTENT_TYPE, > > >> constant("application/json;charset=UTF-8")) > > >> .setHeader(Constantes.ACCEPT,constant("application/json")) > > >> .setHeader(Constantes.API_KEY, constant(apiKey)) > > >> > > >> > > >> > > > .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true) > > >> .process(apiConsumerProcessorLogger) > > >> .to(this.url) > > >> .process(kafkaOffsetProcessor); > > >> // @formatter:on > > >> } > > >> > > >> private String getKafkaEndpoint() { > > >> String endpoint = > > >> > > "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers=" > + > > >> kafkaBrokers; > > >> > > >> if (securityEnabled()) { > > >> endpoint += "&securityProtocol=SASL_SSL" + > > >> "&saslMechanism=PLAIN" > > >> + > > >> > "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule > > >> required username=\"" > > >> + username + "\" password=\"" + password + "\";" + > > >> "&sslTruststoreLocation=" + sslTrustStoreLocation > > >> + "&sslTruststorePassword=" + > sslTruststorePassword; > > >> } > > >> > > >> return endpoint; > > >> } > > >> > > >> The problem is that we systematically get this error when a message is > > >> consumed by a route : > > >> > > >> Trace: java.util.ConcurrentModificationException: KafkaConsumer is not > > >> safe for multi-threaded access > > >> at > > >> > > > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) > > >> at > > >> > > > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) > > >> at > > >> > > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255) > > >> at > > >> > > > org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60) > > >> at > > >> > > > org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51) > > >> > > >> > > >> My understanding is that the instance of KafkaConsumer is reused in > > >> multiple routes and therefore it generates the error, but it could be > > also > > >> related to using SEDA endpoint as stated here ( > > >> https://issues.apache.org/jira/browse/CAMEL-12722), which we don't > > >> explicitly do. > > >> > > >> We tried injecting a KafkaComponent local bean in the route : > > >> > > >> > > >> > > > .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic", > > >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration) > > >> .end() > > >> > > >> > > > .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration", > > >> "#{{myKafkaConfiguration}}") > > >> .end() > > >> .from("#{{myKafka}}") > > >> > > >> But it ends up with another error because you cannot consume a Bean > > >> endpoint ( > > https://camel.apache.org/components/3.18.x/bean-component.html) > > >> > > >> How to use a different KafkaConsumer for every created route ? Or, if > > the > > >> issue is SEDA related, how to make this route a direct route? > > >> > > >> Thank you for your help > > >> > > > > > > > > > -- > > > Otavio R. Piske > > > http://orpiske.net > > > > > -- > Otavio R. Piske > http://orpiske.net >
