Hey all,

Evaluating Samza currently and am running into some odd issues.

I'm currently working off the 'hello-samza' repo and trying to parse a
simple kafka topic that I've produced through an extenal java app (nothing
other than a series of sentences) and it's failing pretty hard for me. The
base 'hello-samza' set of apps works fine, but as soon as I change the
configuration to look at a different Kafka/zookeeper I get the following in
the userlogs:

2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch last offsets
for streams [myTopic] due to kafka.common.KafkaException: fetching topic
metadata for topics [Set(myTopic)] from broker
[ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying.


The modifications are pretty straightforward.  In the
Wikipedia-parser.properties, I've changed the following:
task.inputs=kafka.myTopic
systems.kafka.consumer.zookeeper.connect=redacted:2181/
systems.kafka.consumer.auto.offset.reset=smallest
systems.kafka.producer.metadata.broker.list=redacted:9092

and in the actual java file WikipediaParserStreamTask.java
  public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) {
    Map<String, Object> jsonObject = (Map<String, Object>)
envelope.getMessage();
    WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);

    try {
        System.out.println(event.getRawEvent());

And then following the compile/extract/run process outlined in the
hello-samza website.

Any thoughts?  I've looked online for any 'super simple' examples of
ingesting kafka in samza with very little success.

Reply via email to