More generally, do you have any log errors/messages or additional info? It's tough to debug issues like this from 3rd party libraries if they don't provide logs/exception info that indicates why processing a specific message failed.
-Ewen On Thu, Jan 5, 2017 at 8:29 PM, UMESH CHAUDHARY <umesh9...@gmail.com> wrote: > Did you test that kafka console consumer is displaying the produced > message? > > On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <swati.gu...@anz.com> wrote: > > > Hello All, > > > > > > > > I am trying to create a Consumer using Apache Camel for a topic in Apache > > Kafka. > > I am using Camel 2.17.0 and Kafka 0.10 and JDK 1.8. > > I have attached a file, KafkaCamelTestConsumer.java which is a standalone > > application trying to read from a topic “test1”created in Apache Kafka > > I am producing messages from the console and also was successful to > > produce messages using a Camel program in the topic "test1", but not able > > to consume messages. Ideally, it should get printed, but nothing seems to > > happen. The log says that the route has started but does not process any > > message. > > > > Please help to confirm if there is anything wrong with the below syntax: > > > > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew& > autoOffsetReset=earliest" > > *+ > > > > *"&consumersCount=1&keyDeserializer=org.apache. > kafka.common.serialization.StringDeserializer&" > > *+ > > *"valueDeserializer=org.apache.kafka.common.serialization. > StringDeserializer" > > *+ > > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000& > autoCommitEnable=true"* > > ).split() > > .body() > > .process(*new *Processor() { > > @Override > > *public void *process(Exchange exchange) > > *throws *Exception { > > String messageKey = *""*; > > *if *(exchange.getIn() != *null*) { > > Message message = exchange.getIn(); > > Integer partitionId = (Integer) message > > .getHeader(KafkaConstants.* > PARTITION* > > ); > > String topicName = (String) message > > .getHeader(KafkaConstants.*TOPIC*); > > *if *(message.getHeader( > KafkaConstants.*KEY*) > > != *null*) > > messageKey = (String) message > > .getHeader(KafkaConstants.* > KEY*); > > Object data = message.getBody(); > > > > > > System.*out*.println( > > *"topicName :: " *+ topicName + > > *" partitionId :: " *+ partitionId + > > *" messageKey :: " *+ messageKey + > > *" message :: " *+ data + > *"**\n**"*); > > } > > } > > }).to( > > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*); > > } > > }); > > > > > > > > I have also tried with the basic parameters as below and it still fails > to > > read messages. > > > > from( > > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew& > autoOffsetReset=earliest")* > > > > Any help on this will be greatly appreciated. > > > > Thanks in advance > > > > > > > > Thanks & Regards > > > > Swati > > > > ------------------------------ > > This e-mail and any attachments to it (the "Communication") is, unless > > otherwise stated, confidential, may contain copyright material and is for > > the use only of the intended recipient. If you receive the Communication > in > > error, please notify the sender immediately by return e-mail, delete the > > Communication and the return e-mail, and do not read, copy, retransmit or > > otherwise deal with it. Any views expressed in the Communication are > those > > of the individual sender only, unless expressly stated to be those of > > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or > any > > of its related entities including ANZ Bank New Zealand Limited (together > > "ANZ"). ANZ does not accept liability in connection with the integrity of > > or errors in the Communication, computer virus, data corruption, > > interference or delay arising from or in respect of the Communication. > > > > >