Did you test that kafka console consumer is displaying the produced message?
On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <swati.gu...@anz.com> wrote: > Hello All, > > > > I am trying to create a Consumer using Apache Camel for a topic in Apache > Kafka. > I am using Camel 2.17.0 and Kafka 0.10 and JDK 1.8. > I have attached a file, KafkaCamelTestConsumer.java which is a standalone > application trying to read from a topic “test1”created in Apache Kafka > I am producing messages from the console and also was successful to > produce messages using a Camel program in the topic "test1", but not able > to consume messages. Ideally, it should get printed, but nothing seems to > happen. The log says that the route has started but does not process any > message. > > Please help to confirm if there is anything wrong with the below syntax: > > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest" > *+ > > *"&consumersCount=1&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" > *+ > *"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" > *+ > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"* > ).split() > .body() > .process(*new *Processor() { > @Override > *public void *process(Exchange exchange) > *throws *Exception { > String messageKey = *""*; > *if *(exchange.getIn() != *null*) { > Message message = exchange.getIn(); > Integer partitionId = (Integer) message > .getHeader(KafkaConstants.*PARTITION* > ); > String topicName = (String) message > .getHeader(KafkaConstants.*TOPIC*); > *if *(message.getHeader(KafkaConstants.*KEY*) > != *null*) > messageKey = (String) message > .getHeader(KafkaConstants.*KEY*); > Object data = message.getBody(); > > > System.*out*.println( > *"topicName :: " *+ topicName + > *" partitionId :: " *+ partitionId + > *" messageKey :: " *+ messageKey + > *" message :: " *+ data + *"**\n**"*); > } > } > }).to( > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*); > } > }); > > > > I have also tried with the basic parameters as below and it still fails to > read messages. > > from( > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest")* > > Any help on this will be greatly appreciated. > > Thanks in advance > > > > Thanks & Regards > > Swati > > ------------------------------ > This e-mail and any attachments to it (the "Communication") is, unless > otherwise stated, confidential, may contain copyright material and is for > the use only of the intended recipient. If you receive the Communication in > error, please notify the sender immediately by return e-mail, delete the > Communication and the return e-mail, and do not read, copy, retransmit or > otherwise deal with it. Any views expressed in the Communication are those > of the individual sender only, unless expressly stated to be those of > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any > of its related entities including ANZ Bank New Zealand Limited (together > "ANZ"). ANZ does not accept liability in connection with the integrity of > or errors in the Communication, computer virus, data corruption, > interference or delay arising from or in respect of the Communication. > >