Just to be clear, here's what's changed from the default hello-samza repo:

wikipedia-parser.properties==========================
task.inputs=kafka.myTopic
systems.kafka.consumer.zookeeper.connect=
ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
systems.kafka.consumer.auto.offset.reset=smallest

WikipediaParserStreamTask.java =====================
  public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) {
    Map<String, Object> jsonObject = (Map<String, Object>)
envelope.getMessage();
    WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);

    try {
      System.out.println(event.getRawEvent());
      // Map<String, Object> parsedJsonObject = parse(event.getRawEvent());

      // parsedJsonObject.put("channel", event.getChannel());
      // parsedJsonObject.put("source", event.getSource());
      // parsedJsonObject.put("time", event.getTime());

      // collector.send(new OutgoingMessageEnvelope(new
SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));

as well as the aforementioned changes to the log4j.xml file.

The data pushed into the 'myTopic' topic is nothing more than a sentence.


On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <ash.mathe...@gmail.com>
wrote:

> yep, modified log4j.xml to look like this:
>
>   <root>
>     <priority value="debug" />
>     <appender-ref ref="RollingAppender"/>
>     <appender-ref ref="jmx" />
>   </root>
>
> Not sure what you mean by #2.
>
> However, I'm running now, not seeing any exceptions, but still not seeing
> any output from System.out.println(...)
>
> On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram <
> nsomasunda...@linkedin.com.invalid> wrote:
>
>> Hey Ash,
>>                1. Did you happen to modify your log4j.xml ?
>>                2. Can you print the class path that was printed when the
>> job started ? I am wondering if log4j was not loaded or not present in the
>> path where it’s looking for. If you have been using hello samza, it should
>> have pulled it from Maven.
>>
>> Thanks,
>> Naveen
>>
>> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <ash.mathe...@gmail.com>
>> wrote:
>>
>> > Hey all,
>> >
>> > Evaluating Samza currently and am running into some odd issues.
>> >
>> > I'm currently working off the 'hello-samza' repo and trying to parse a
>> > simple kafka topic that I've produced through an extenal java app
>> (nothing
>> > other than a series of sentences) and it's failing pretty hard for me.
>> The
>> > base 'hello-samza' set of apps works fine, but as soon as I change the
>> > configuration to look at a different Kafka/zookeeper I get the
>> following in
>> > the userlogs:
>> >
>> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch last offsets
>> > for streams [myTopic] due to kafka.common.KafkaException: fetching topic
>> > metadata for topics [Set(myTopic)] from broker
>> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying.
>> >
>> >
>> > The modifications are pretty straightforward.  In the
>> > Wikipedia-parser.properties, I've changed the following:
>> > task.inputs=kafka.myTopic
>> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
>> > systems.kafka.consumer.auto.offset.reset=smallest
>> > systems.kafka.producer.metadata.broker.list=redacted:9092
>> >
>> > and in the actual java file WikipediaParserStreamTask.java
>> >  public void process(IncomingMessageEnvelope envelope, MessageCollector
>> > collector, TaskCoordinator coordinator) {
>> >    Map<String, Object> jsonObject = (Map<String, Object>)
>> > envelope.getMessage();
>> >    WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
>> >
>> >    try {
>> >        System.out.println(event.getRawEvent());
>> >
>> > And then following the compile/extract/run process outlined in the
>> > hello-samza website.
>> >
>> > Any thoughts?  I've looked online for any 'super simple' examples of
>> > ingesting kafka in samza with very little success.
>>
>>
>

Reply via email to