Yes, the kafka console consumer displays the message correctly. I also tested the same with a Java application, it works fine. There seems to be an issue with Camel route trying to consume.
There is no error in the console. But, the logs show as below: kafka.KafkaCamelTestConsumer Connected to the target VM, address: '127.0.0.1:65007', transport: 'socket' PID_IS_UNDEFINED: INFO DefaultCamelContext - Apache Camel 2.17.0 (CamelContext: camel-1) is starting PID_IS_UNDEFINED: INFO ManagedManagementStrategy - JMX is enabled PID_IS_UNDEFINED: INFO DefaultTypeConverter - Loaded 183 type converters PID_IS_UNDEFINED: INFO DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000) PID_IS_UNDEFINED: INFO DefaultCamelContext - AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance. PID_IS_UNDEFINED: INFO DefaultCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html PID_IS_UNDEFINED: INFO KafkaConsumer - Starting Kafka consumer PID_IS_UNDEFINED: INFO ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1024 group.id = testing heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 40000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 30000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer PID_IS_UNDEFINED: INFO ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.id = consumer-1 connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1024 group.id = testing heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 40000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 30000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer PID_IS_UNDEFINED: INFO AppInfoParser - Kafka version : 0.10.1.0 PID_IS_UNDEFINED: INFO AppInfoParser - Kafka commitId : 3402a74efb23d1d4 PID_IS_UNDEFINED: INFO DefaultCamelContext - Route: route1 started and consuming from: Endpoint[kafka://localhost:9092?autoOffsetReset=earliest&consumersCount=1&groupId=testing&topic=test] PID_IS_UNDEFINED: INFO DefaultCamelContext - Total 1 routes, of which 1 are started. PID_IS_UNDEFINED: INFO DefaultCamelContext -----Original Message----- From: Ewen Cheslack-Postava [mailto:e...@confluent.io] Sent: Friday, 6 January 2017 3:58 PM To: users@kafka.apache.org Subject: Re: Apache Kafka integration using Apache Camel More generally, do you have any log errors/messages or additional info? It's tough to debug issues like this from 3rd party libraries if they don't provide logs/exception info that indicates why processing a specific message failed. -Ewen On Thu, Jan 5, 2017 at 8:29 PM, UMESH CHAUDHARY <umesh9...@gmail.com> wrote: > Did you test that kafka console consumer is displaying the produced > message? > > On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <swati.gu...@anz.com> wrote: > > > Hello All, > > > > > > > > I am trying to create a Consumer using Apache Camel for a topic in > > Apache Kafka. > > I am using Camel 2.17.0 and Kafka 0.10 and JDK 1.8. > > I have attached a file, KafkaCamelTestConsumer.java which is a > > standalone application trying to read from a topic “test1”created > > in Apache Kafka I am producing messages from the console and also > > was successful to produce messages using a Camel program in the > > topic "test1", but not able to consume messages. Ideally, it should > > get printed, but nothing seems to happen. The log says that the > > route has started but does not process any message. > > > > Please help to confirm if there is anything wrong with the below syntax: > > > > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew& > autoOffsetReset=earliest" > > *+ > > > > *"&consumersCount=1&keyDeserializer=org.apache. > kafka.common.serialization.StringDeserializer&" > > *+ > > *"valueDeserializer=org.apache.kafka.common.serialization. > StringDeserializer" > > *+ > > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000& > autoCommitEnable=true"* > > ).split() > > .body() > > .process(*new *Processor() { > > @Override > > *public void *process(Exchange exchange) > > *throws *Exception { > > String messageKey = *""*; > > *if *(exchange.getIn() != *null*) { > > Message message = exchange.getIn(); > > Integer partitionId = (Integer) message > > .getHeader(KafkaConstants.* > PARTITION* > > ); > > String topicName = (String) message > > .getHeader(KafkaConstants.*TOPIC*); > > *if *(message.getHeader( > KafkaConstants.*KEY*) > > != *null*) > > messageKey = (String) message > > .getHeader(KafkaConstants.* > KEY*); > > Object data = message.getBody(); > > > > > > System.*out*.println( > > *"topicName :: " *+ topicName + > > *" partitionId :: " *+ partitionId + > > *" messageKey :: " *+ messageKey + > > *" message :: " *+ data + > *"**\n**"*); > > } > > } > > }).to( > > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*); > > } > > }); > > > > > > > > I have also tried with the basic parameters as below and it still > > fails > to > > read messages. > > > > from( > > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew& > autoOffsetReset=earliest")* > > > > Any help on this will be greatly appreciated. > > > > Thanks in advance > > > > > > > > Thanks & Regards > > > > Swati > > > > ------------------------------ > > This e-mail and any attachments to it (the "Communication") is, > > unless otherwise stated, confidential, may contain copyright > > material and is for the use only of the intended recipient. If you > > receive the Communication > in > > error, please notify the sender immediately by return e-mail, delete > > the Communication and the return e-mail, and do not read, copy, > > retransmit or otherwise deal with it. Any views expressed in the > > Communication are > those > > of the individual sender only, unless expressly stated to be those > > of Australia and New Zealand Banking Group Limited ABN 11 005 357 > > 522, or > any > > of its related entities including ANZ Bank New Zealand Limited > > (together "ANZ"). ANZ does not accept liability in connection with > > the integrity of or errors in the Communication, computer virus, > > data corruption, interference or delay arising from or in respect of the > > Communication. > > > > > This e-mail and any attachments to it (the "Communication") is, unless otherwise stated, confidential, may contain copyright material and is for the use only of the intended recipient. If you receive the Communication in error, please notify the sender immediately by return e-mail, delete the Communication and the return e-mail, and do not read, copy, retransmit or otherwise deal with it. Any views expressed in the Communication are those of the individual sender only, unless expressly stated to be those of Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any of its related entities including ANZ Bank New Zealand Limited (together "ANZ"). ANZ does not accept liability in connection with the integrity of or errors in the Communication, computer virus, data corruption, interference or delay arising from or in respect of the Communication.