Just to be clear, here's what's changed from the default hello-samza repo: wikipedia-parser.properties========================== task.inputs=kafka.myTopic systems.kafka.consumer.zookeeper.connect= ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ systems.kafka.consumer.auto.offset.reset=smallest
WikipediaParserStreamTask.java ===================== public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage(); WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); try { System.out.println(event.getRawEvent()); // Map<String, Object> parsedJsonObject = parse(event.getRawEvent()); // parsedJsonObject.put("channel", event.getChannel()); // parsedJsonObject.put("source", event.getSource()); // parsedJsonObject.put("time", event.getTime()); // collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject)); as well as the aforementioned changes to the log4j.xml file. The data pushed into the 'myTopic' topic is nothing more than a sentence. On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <ash.mathe...@gmail.com> wrote: > yep, modified log4j.xml to look like this: > > <root> > <priority value="debug" /> > <appender-ref ref="RollingAppender"/> > <appender-ref ref="jmx" /> > </root> > > Not sure what you mean by #2. > > However, I'm running now, not seeing any exceptions, but still not seeing > any output from System.out.println(...) > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram < > nsomasunda...@linkedin.com.invalid> wrote: > >> Hey Ash, >> 1. Did you happen to modify your log4j.xml ? >> 2. Can you print the class path that was printed when the >> job started ? I am wondering if log4j was not loaded or not present in the >> path where it’s looking for. If you have been using hello samza, it should >> have pulled it from Maven. >> >> Thanks, >> Naveen >> >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <ash.mathe...@gmail.com> >> wrote: >> >> > Hey all, >> > >> > Evaluating Samza currently and am running into some odd issues. >> > >> > I'm currently working off the 'hello-samza' repo and trying to parse a >> > simple kafka topic that I've produced through an extenal java app >> (nothing >> > other than a series of sentences) and it's failing pretty hard for me. >> The >> > base 'hello-samza' set of apps works fine, but as soon as I change the >> > configuration to look at a different Kafka/zookeeper I get the >> following in >> > the userlogs: >> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch last offsets >> > for streams [myTopic] due to kafka.common.KafkaException: fetching topic >> > metadata for topics [Set(myTopic)] from broker >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying. >> > >> > >> > The modifications are pretty straightforward. In the >> > Wikipedia-parser.properties, I've changed the following: >> > task.inputs=kafka.myTopic >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/ >> > systems.kafka.consumer.auto.offset.reset=smallest >> > systems.kafka.producer.metadata.broker.list=redacted:9092 >> > >> > and in the actual java file WikipediaParserStreamTask.java >> > public void process(IncomingMessageEnvelope envelope, MessageCollector >> > collector, TaskCoordinator coordinator) { >> > Map<String, Object> jsonObject = (Map<String, Object>) >> > envelope.getMessage(); >> > WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); >> > >> > try { >> > System.out.println(event.getRawEvent()); >> > >> > And then following the compile/extract/run process outlined in the >> > hello-samza website. >> > >> > Any thoughts? I've looked online for any 'super simple' examples of >> > ingesting kafka in samza with very little success. >> >> >