just to clarify, adding: serializers.registry.string.class=org.apache.samza. serializers.StringSerdeFactory systems.kafka.streams.myTopic.samza.msg.serde=string
to the property file and updating the java source to: System.out.println((String)envelope.getMessage()); Did the trick. I've updated the pastebin with the appropriate change. Now, through this whole process I've assumed that StreamTask is the appropriate class to derive from. Essentially, the end goal is to decode a compressed bytestream Kafka event (from a different topic, of course) and then feed it to a DB/TSDB/whatever. I didn't think that I'd need to generate a Job for this and that this stream should be able to feed directly to the output entity. Anyway, over the next couple of days I'll migrate this into something that can live in the hello-samza ecosystem as a separate task. I've got the start of a java based DataPusher built as well. On Mon, Mar 23, 2015 at 9:40 PM, Ash W Matheson <ash.mathe...@gmail.com> wrote: > Huzzah! I ... have ... text showing! > > This has been enough of a trial that I think I'll convert this into a very > simple sample project for the repo, if you guys are interested. > > Diff coming once I have it cleaned up into something less ugly. > > -Ash > > On Mon, Mar 23, 2015 at 9:27 PM, Chinmay Soman <chinmay.cere...@gmail.com> > wrote: > >> >I changed the systems.kafka.samza.msg.serde=json to 'string' a while >> back, >> but that caused a separate exception. However that was many, MANY >> attempts >> ago. >> >> This may not work because that will set all serialization formats (input >> and output) to json / string. In your case you're inputting string and >> outputting json. So you might have to set that explicitly. >> >> On Mon, Mar 23, 2015 at 9:24 PM, Chinmay Soman <chinmay.cere...@gmail.com >> > >> wrote: >> >> > Since you're producing String data to 'myTopic', can you try setting the >> > string serialization in your config ? >> > >> > >> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory >> > >> > systems.kafka.streams.myTopic.samza.msg.serde=string >> > >> > >> > On Mon, Mar 23, 2015 at 9:17 PM, Ash W Matheson <ash.mathe...@gmail.com >> > >> > wrote: >> > >> >> more info - new exception message: >> >> >> >> Exception in thread "main" >> >> org.apache.samza.system.SystemConsumersException: Cannot deserialize an >> >> incoming message. >> >> >> >> Updated the diff in pastebin with the changes. >> >> >> >> On Mon, Mar 23, 2015 at 8:41 PM, Ash W Matheson < >> ash.mathe...@gmail.com> >> >> wrote: >> >> >> >> > Gah! Yeah, those were gone several revisions ago but didn't get >> nuked >> >> in >> >> > the last iteration. >> >> > >> >> > OK, let me do a quick test to see if that was my problem all along. >> >> > >> >> > On Mon, Mar 23, 2015 at 8:38 PM, Navina Ramesh < >> >> > nram...@linkedin.com.invalid> wrote: >> >> > >> >> >> Hey Ash, >> >> >> I was referring to the lines before the try block. >> >> >> >> >> >> Map<String, Object> jsonObject = (Map<String, Object>) >> >> >> envelope.getMessage(); >> >> >> WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); >> >> >> >> >> >> try { >> >> >> System.out.println("[DWH] should see this"); >> >> >> System.out.println(event.getRawEvent()); >> >> >> … >> >> >> >> >> >> >> >> >> Did you remove those lines as well? >> >> >> >> >> >> Navina >> >> >> >> >> >> On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.mathe...@gmail.com> >> wrote: >> >> >> >> >> >> >Just looking at the diff I posted and it's: >> >> >> > >> >> >> > >> >> >> > 1. try { >> >> >> > 2. - Map<String, Object> parsedJsonObject = >> >> >> >parse(event.getRawEvent( >> >> >> > )); >> >> >> > 3. + System.out.println("[DWH] should see this"); >> >> >> > 4. + System.out.println(event.getRawEvent()); >> >> >> > 5. + // Map<String, Object> parsedJsonObject = parse( >> >> >> > event.getRawEvent()); >> >> >> > >> >> >> > >> >> >> >I've removed the Map and added two System.out.println calls. So >> no, >> >> >> there >> >> >> >shouldn't be any reference to >> >> >> >Map<String, Object> parsedJsonObject = parse(event.getRawEvent()); >> >> >> >in the source java file. >> >> >> > >> >> >> > >> >> >> >On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson < >> >> ash.mathe...@gmail.com> >> >> >> >wrote: >> >> >> > >> >> >> >> I'm in transit right now but if memory serves me everything >> should >> >> be >> >> >> >> commented out of that method except for the System.out.println >> call. >> >> >> >>I'll >> >> >> >> be home shortly and can confirm. >> >> >> >> On Mar 23, 2015 7:28 PM, "Navina Ramesh" >> >> <nram...@linkedin.com.invalid >> >> >> > >> >> >> >> wrote: >> >> >> >> >> >> >> >>> Hi Ash, >> >> >> >>> I just ran wikipedia-parser with your patch. Looks like you have >> >> set >> >> >> >>>the >> >> >> >>> message serde correctly in the configs. However, the original >> code >> >> >> >>>still >> >> >> >>> converts it into a Map for consumption in the >> WikipediaFeedEvent. >> >> >> >>> I am seeing the following (expected): >> >> >> >>> >> >> >> >>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] >> Uncaught >> >> >> >>> exception in thread (name=main). Exiting process now. >> >> >> >>> java.lang.ClassCastException: java.lang.String cannot be cast to >> >> >> >>> java.util.Map >> >> >> >>> at >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi >> >> >> >>>aPa >> >> >> >>> rserStreamTask.java:38) >> >> >> >>> at >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp( >> >> >> >>>Tas >> >> >> >>> kInstance.scala:133) >> >> >> >>> >> >> >> >>> Did you make the changes to fix this error? Your patch doesn¹t >> >> seem to >> >> >> >>> have that. >> >> >> >>> Line 38 Map<String, Object> jsonObject = (Map<String, Object>) >> >> >> >>> envelope.getMessage(); >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> Lmk so I can investigate further. >> >> >> >>> >> >> >> >>> Cheers! >> >> >> >>> Navina >> >> >> >>> >> >> >> >>> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com> >> >> wrote: >> >> >> >>> >> >> >> >>> >If anyone's interested, I've posted a diff of the project here: >> >> >> >>> >http://pastebin.com/6ZW6Y1Vu >> >> >> >>> >and the python publisher here: http://pastebin.com/2NvTFDFx >> >> >> >>> > >> >> >> >>> >if you want to take a stab at it. >> >> >> >>> > >> >> >> >>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson >> >> >> >>><ash.mathe...@gmail.com> >> >> >> >>> >wrote: >> >> >> >>> > >> >> >> >>> >> Ok, so very simple test, all running on a local machine, not >> >> across >> >> >> >>> >> networks and all in the hello-samza repo this time around. >> >> >> >>> >> >> >> >> >>> >> I've got the datapusher.py file set up to push data into >> >> localhost. >> >> >> >>>One >> >> >> >>> >> event per second. >> >> >> >>> >> And a modified hello-samza where I've modified the >> >> >> >>> >> WikipediaParserStreamTask.java class to simply read what's >> >> there. >> >> >> >>> >> >> >> >> >>> >> Running them both now and I'm seeing in the stderr files >> >> >> >>> >> >> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) >> >> >> >>>the >> >> >> >>> >> following: >> >> >> >>> >> >> >> >> >>> >> Exception in thread "main" >> >> >> >>> >> org.apache.samza.system.SystemConsumersException: Cannot >> >> >> >>>deserialize an >> >> >> >>> >> incoming message. >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2 >> >> >> >>>>>93) >> >> >> >>> >> at org.apache.samza.system.SystemConsumers.org >> >> >> >>> >> >> >> >> >> >> >> >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste >> >> >> >>>>>mCo >> >> >> >>> >>nsumers.scala:276) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste >> >> >> >>>>>mCo >> >> >> >>> >>nsumers.scala:276) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. >> >> >> >>>>>sca >> >> >> >>> >>la:244) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. >> >> >> >>>>>sca >> >> >> >>> >>la:244) >> >> >> >>> >> at >> >> scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> >> >>> >> at >> >> >> >>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> >> >>> >> at >> >> >> >>> >> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >> >> >> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s >> >> >> >>>>>cal >> >> >> >>> >>a:47) >> >> >> >>> >> at scala.collection.SetLike$class.map(SetLike.scala:93) >> >> >> >>> >> at scala.collection.AbstractSet.map(Set.scala:47) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala: >> >> >> >>>>>276 >> >> >> >>> >>) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2 >> >> >> >>>>>13) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply >> >> >> >>>>>(Ru >> >> >> >>> >>nLoop.scala:81) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply >> >> >> >>>>>(Ru >> >> >> >>> >>nLoop.scala:81) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >> >> >> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >> >> >> >>> >> at >> >> >> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc >> >> >> >>>>>ala >> >> >> >>> >>:80) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >> >> >> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >> >> >> >>> >> at >> >> >> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) >> >> >> >>> >> at >> >> org.apache.samza.container.RunLoop.process(RunLoop.scala:79) >> >> >> >>> >> at >> org.apache.samza.container.RunLoop.run(RunLoop.scala:65) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >> >> >> >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca >> >> >> >>>>>la: >> >> >> >>> >>108) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) >> >> >> >>> >> at >> >> >> >>> >> >> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >> >> >> >>> >> Caused by: org.codehaus.jackson.JsonParseException: >> Unexpected >> >> >> >>> character >> >> >> >>> >> ('M' (code 77)): expected a valid value (number, String, >> array, >> >> >> >>>object, >> >> >> >>> >> 'true', 'false' or 'null') >> >> >> >>> >> at [Source: [B@5454d285; line: 1, column: 2] >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >> >> >> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse >> >> >> >>>>>rMi >> >> >> >>> >>nimalBase.java:385) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar( >> >> >> >>>>>Jso >> >> >> >>> >>nParserMinimalBase.java:306) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8 >> >> >> >>>>>Str >> >> >> >>> >>eamParser.java:1581) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S >> >> >> >>>>>tre >> >> >> >>> >>amParser.java:436) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser. >> >> >> >>>>>jav >> >> >> >>> >>a:322) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav >> >> >> >>>>>a:2 >> >> >> >>> >>432) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja >> >> >> >>>>>va: >> >> >> >>> >>2389) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >> >> >> >>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) >> >> >> >>> >> at >> >> >> >>> >> >> >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala >> >> >> >>>>>:11 >> >> >> >>> >>5) >> >> >> >>> >> at >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2 >> >> >> >>>>>90) >> >> >> >>> >> >> >> >> >>> >> >> >> >> >>> >> I changed the systems.kafka.samza.msg.serde=json to 'string' >> a >> >> >> while >> >> >> >>> >>back, >> >> >> >>> >> but that caused a separate exception. However that was many, >> >> MANY >> >> >> >>> >>attempts >> >> >> >>> >> ago. >> >> >> >>> >> >> >> >> >>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson < >> >> >> >>> ash.mathe...@gmail.com> >> >> >> >>> >> wrote: >> >> >> >>> >> >> >> >> >>> >>> Ahh, I was going to add it to the run-class.sh script. >> >> >> >>> >>> >> >> >> >>> >>> Yeah, it's already there by default: >> >> >> >>> >>> >> >> >> >>> >>> >> >> >> >>> >>> # Metrics >> >> >> >>> >>> metrics.reporters=snapshot,jmx >> >> >> >>> >>> >> >> >> >>> >>> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met >> >> >> >>>>>>ric >> >> >> >>> >>>sSnapshotReporterFactory >> >> >> >>> >>> metrics.reporter.snapshot.stream=kafka.metrics >> >> >> >>> >>> >> >> >> >>> >>> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> >> >>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor >> >> >> >>>>>>ter >> >> >> >>> >>>Factory >> >> >> >>> >>> >> >> >> >>> >>> So, where would I see those metrics? >> >> >> >>> >>> >> >> >> >>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson >> >> >> >>> >>><ash.mathe...@gmail.com> >> >> >> >>> >>> wrote: >> >> >> >>> >>> >> >> >> >>> >>>> read: I'm a C++ programmer looking at Java for the first >> time >> >> in >> >> >> >>>> 10 >> >> >> >>> >>>> years >> >> >> >>> >>>> >> >> >> >>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson >> >> >> >>> >>>><ash.mathe...@gmail.com> >> >> >> >>> >>>> wrote: >> >> >> >>> >>>> >> >> >> >>> >>>>> I'm assuming I have Jmx defined ... where would that get >> set? >> >> >> >>> >>>>> >> >> >> >>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman < >> >> >> >>> >>>>> chinmay.cere...@gmail.com> wrote: >> >> >> >>> >>>>> >> >> >> >>> >>>>>> Hey Ash, >> >> >> >>> >>>>>> >> >> >> >>> >>>>>> Can you see your job metrics (if you have the Jmx metrics >> >> >> >>>defined) >> >> >> >>> >>>>>>to >> >> >> >>> >>>>>> see >> >> >> >>> >>>>>> if your job is actually doing anything ? My only guess at >> >> this >> >> >> >>> point >> >> >> >>> >>>>>> is the >> >> >> >>> >>>>>> process method is not being called because somehow >> there's >> >> no >> >> >> >>> >>>>>>incoming >> >> >> >>> >>>>>> data. I could be totally wrong of course. >> >> >> >>> >>>>>> >> >> >> >>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson < >> >> >> >>> >>>>>> ash.mathe...@gmail.com> >> >> >> >>> >>>>>> wrote: >> >> >> >>> >>>>>> >> >> >> >>> >>>>>> > Just to be clear, here's what's changed from the >> default >> >> >> >>> >>>>>>hello-samza >> >> >> >>> >>>>>> repo: >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > wikipedia-parser.properties========================== >> >> >> >>> >>>>>> > task.inputs=kafka.myTopic >> >> >> >>> >>>>>> > systems.kafka.consumer.zookeeper.connect= >> >> >> >>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ >> >> >> >>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > WikipediaParserStreamTask.java ===================== >> >> >> >>> >>>>>> > public void process(IncomingMessageEnvelope envelope, >> >> >> >>> >>>>>> MessageCollector >> >> >> >>> >>>>>> > collector, TaskCoordinator coordinator) { >> >> >> >>> >>>>>> > Map<String, Object> jsonObject = (Map<String, >> Object>) >> >> >> >>> >>>>>> > envelope.getMessage(); >> >> >> >>> >>>>>> > WikipediaFeedEvent event = new >> >> >> >>> WikipediaFeedEvent(jsonObject); >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > try { >> >> >> >>> >>>>>> > System.out.println(event.getRawEvent()); >> >> >> >>> >>>>>> > // Map<String, Object> parsedJsonObject = >> >> >> >>> >>>>>> parse(event.getRawEvent()); >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > // parsedJsonObject.put("channel", >> >> event.getChannel()); >> >> >> >>> >>>>>> > // parsedJsonObject.put("source", >> >> event.getSource()); >> >> >> >>> >>>>>> > // parsedJsonObject.put("time", event.getTime()); >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > // collector.send(new OutgoingMessageEnvelope(new >> >> >> >>> >>>>>> > SystemStream("kafka", "wikipedia-edits"), >> >> parsedJsonObject)); >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > as well as the aforementioned changes to the log4j.xml >> >> file. >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > The data pushed into the 'myTopic' topic is nothing >> more >> >> than >> >> >> >>>a >> >> >> >>> >>>>>> sentence. >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson < >> >> >> >>> >>>>>> ash.mathe...@gmail.com> >> >> >> >>> >>>>>> > wrote: >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > > yep, modified log4j.xml to look like this: >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> > > <root> >> >> >> >>> >>>>>> > > <priority value="debug" /> >> >> >> >>> >>>>>> > > <appender-ref ref="RollingAppender"/> >> >> >> >>> >>>>>> > > <appender-ref ref="jmx" /> >> >> >> >>> >>>>>> > > </root> >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> > > Not sure what you mean by #2. >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> > > However, I'm running now, not seeing any exceptions, >> but >> >> >> >>>still >> >> >> >>> >>>>>>not >> >> >> >>> >>>>>> seeing >> >> >> >>> >>>>>> > > any output from System.out.println(...) >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen >> Somasundaram < >> >> >> >>> >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote: >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> > >> Hey Ash, >> >> >> >>> >>>>>> > >> 1. Did you happen to modify your >> >> log4j.xml >> >> >> ? >> >> >> >>> >>>>>> > >> 2. Can you print the class path that >> was >> >> >> >>> printed >> >> >> >>> >>>>>> when the >> >> >> >>> >>>>>> > >> job started ? I am wondering if log4j was not >> loaded or >> >> >> not >> >> >> >>> >>>>>> present in >> >> >> >>> >>>>>> > the >> >> >> >>> >>>>>> > >> path where it¹s looking for. If you have been using >> >> hello >> >> >> >>> >>>>>>samza, >> >> >> >>> >>>>>> it >> >> >> >>> >>>>>> > should >> >> >> >>> >>>>>> > >> have pulled it from Maven. >> >> >> >>> >>>>>> > >> >> >> >> >>> >>>>>> > >> Thanks, >> >> >> >>> >>>>>> > >> Naveen >> >> >> >>> >>>>>> > >> >> >> >> >>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson < >> >> >> >>> >>>>>> ash.mathe...@gmail.com> >> >> >> >>> >>>>>> > >> wrote: >> >> >> >>> >>>>>> > >> >> >> >> >>> >>>>>> > >> > Hey all, >> >> >> >>> >>>>>> > >> > >> >> >> >>> >>>>>> > >> > Evaluating Samza currently and am running into >> some >> >> odd >> >> >> >>> >>>>>>issues. >> >> >> >>> >>>>>> > >> > >> >> >> >>> >>>>>> > >> > I'm currently working off the 'hello-samza' repo >> and >> >> >> >>>trying >> >> >> >>> >>>>>>to >> >> >> >>> >>>>>> parse a >> >> >> >>> >>>>>> > >> > simple kafka topic that I've produced through an >> >> extenal >> >> >> >>> java >> >> >> >>> >>>>>> app >> >> >> >>> >>>>>> > >> (nothing >> >> >> >>> >>>>>> > >> > other than a series of sentences) and it's failing >> >> >> pretty >> >> >> >>> >>>>>>hard >> >> >> >>> >>>>>> for me. >> >> >> >>> >>>>>> > >> The >> >> >> >>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but as >> >> soon >> >> >> >>>as I >> >> >> >>> >>>>>> change the >> >> >> >>> >>>>>> > >> > configuration to look at a different >> Kafka/zookeeper >> >> I >> >> >> >>>get >> >> >> >>> >>>>>>the >> >> >> >>> >>>>>> > >> following in >> >> >> >>> >>>>>> > >> > the userlogs: >> >> >> >>> >>>>>> > >> > >> >> >> >>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] >> Unable to >> >> >> >>>fetch >> >> >> >>> >>>>>>last >> >> >> >>> >>>>>> > offsets >> >> >> >>> >>>>>> > >> > for streams [myTopic] due to >> >> >> kafka.common.KafkaException: >> >> >> >>> >>>>>> fetching >> >> >> >>> >>>>>> > topic >> >> >> >>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker >> >> >> >>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] >> failed. >> >> >> >>> Retrying. >> >> >> >>> >>>>>> > >> > >> >> >> >>> >>>>>> > >> > >> >> >> >>> >>>>>> > >> > The modifications are pretty straightforward. In >> the >> >> >> >>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the >> >> following: >> >> >> >>> >>>>>> > >> > task.inputs=kafka.myTopic >> >> >> >>> >>>>>> > >> > >> >> systems.kafka.consumer.zookeeper.connect=redacted:2181/ >> >> >> >>> >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest >> >> >> >>> >>>>>> > >> > >> >> >> systems.kafka.producer.metadata.broker.list=redacted:9092 >> >> >> >>> >>>>>> > >> > >> >> >> >>> >>>>>> > >> > and in the actual java file >> >> >> >>>WikipediaParserStreamTask.java >> >> >> >>> >>>>>> > >> > public void process(IncomingMessageEnvelope >> >> envelope, >> >> >> >>> >>>>>> > MessageCollector >> >> >> >>> >>>>>> > >> > collector, TaskCoordinator coordinator) { >> >> >> >>> >>>>>> > >> > Map<String, Object> jsonObject = (Map<String, >> >> >> Object>) >> >> >> >>> >>>>>> > >> > envelope.getMessage(); >> >> >> >>> >>>>>> > >> > WikipediaFeedEvent event = new >> >> >> >>> >>>>>> WikipediaFeedEvent(jsonObject); >> >> >> >>> >>>>>> > >> > >> >> >> >>> >>>>>> > >> > try { >> >> >> >>> >>>>>> > >> > System.out.println(event.getRawEvent()); >> >> >> >>> >>>>>> > >> > >> >> >> >>> >>>>>> > >> > And then following the compile/extract/run process >> >> >> >>>outlined >> >> >> >>> >>>>>>in >> >> >> >>> >>>>>> the >> >> >> >>> >>>>>> > >> > hello-samza website. >> >> >> >>> >>>>>> > >> > >> >> >> >>> >>>>>> > >> > Any thoughts? I've looked online for any 'super >> >> simple' >> >> >> >>> >>>>>> examples of >> >> >> >>> >>>>>> > >> > ingesting kafka in samza with very little success. >> >> >> >>> >>>>>> > >> >> >> >> >>> >>>>>> > >> >> >> >> >>> >>>>>> > > >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> >> >> >> >>> >>>>>> >> >> >> >>> >>>>>> >> >> >> >>> >>>>>> -- >> >> >> >>> >>>>>> Thanks and regards >> >> >> >>> >>>>>> >> >> >> >>> >>>>>> Chinmay Soman >> >> >> >>> >>>>>> >> >> >> >>> >>>>> >> >> >> >>> >>>>> >> >> >> >>> >>>> >> >> >> >>> >>> >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> > >> >> >> > >> > >> > >> > -- >> > Thanks and regards >> > >> > Chinmay Soman >> > >> >> >> >> -- >> Thanks and regards >> >> Chinmay Soman >> > >