Did you test that kafka console consumer is displaying the produced
message?

On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <swati.gu...@anz.com> wrote:

> Hello All,
>
>
>
> I am trying to create a Consumer using Apache Camel for a topic in Apache
> Kafka.
> I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
> I have attached a file, KafkaCamelTestConsumer.java which is a standalone
> application trying to read from a topic  “test1”created in Apache Kafka
> I am producing messages from the console and also was successful to
> produce messages using a Camel program in the topic "test1", but not able
> to consume messages. Ideally, it should get printed, but nothing seems to
> happen. The log says that the route has started but does not process any
> message.
>
> Please help to confirm if there is anything wrong with the below syntax:
>
> from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest"
> *+
>
> *"&consumersCount=1&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
>                 *+
> *"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
>                 *+
> *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"*
> ).split()
>                 .body()
>                 .process(*new *Processor() {
>                     @Override
>                     *public void *process(Exchange exchange)
>                             *throws *Exception {
>                         String messageKey = *""*;
>                         *if *(exchange.getIn() != *null*) {
>                             Message message = exchange.getIn();
>                             Integer partitionId = (Integer) message
>                                     .getHeader(KafkaConstants.*PARTITION*
> );
>                             String topicName = (String) message
>                                     .getHeader(KafkaConstants.*TOPIC*);
>                             *if *(message.getHeader(KafkaConstants.*KEY*)
> != *null*)
>                                 messageKey = (String) message
>                                         .getHeader(KafkaConstants.*KEY*);
>                             Object data = message.getBody();
>
>
>                             System.*out*.println(
> *"topicName :: "                                     *+ topicName +
> *" partitionId :: "                                     *+ partitionId +
> *" messageKey :: "                                     *+ messageKey +
> *" message :: "                                     *+ data + *"**\n**"*);
>                         }
>                     }
>                 }).to(
> *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*);
>     }
> });
>
>
>
> I have also tried with the basic parameters as below and it still fails to
> read messages.
>
> from(
> *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest")*
>
> Any help on this will be greatly appreciated.
>
> Thanks in advance
>
>
>
> Thanks & Regards
>
> Swati
>
> ------------------------------
> This e-mail and any attachments to it (the "Communication") is, unless
> otherwise stated, confidential, may contain copyright material and is for
> the use only of the intended recipient. If you receive the Communication in
> error, please notify the sender immediately by return e-mail, delete the
> Communication and the return e-mail, and do not read, copy, retransmit or
> otherwise deal with it. Any views expressed in the Communication are those
> of the individual sender only, unless expressly stated to be those of
> Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any
> of its related entities including ANZ Bank New Zealand Limited (together
> "ANZ"). ANZ does not accept liability in connection with the integrity of
> or errors in the Communication, computer virus, data corruption,
> interference or delay arising from or in respect of the Communication.
>
>

Reply via email to