I'm in transit right now but if memory serves me everything should be commented out of that method except for the System.out.println call. I'll be home shortly and can confirm. On Mar 23, 2015 7:28 PM, "Navina Ramesh" <nram...@linkedin.com.invalid> wrote:
> Hi Ash, > I just ran wikipedia-parser with your patch. Looks like you have set the > message serde correctly in the configs. However, the original code still > converts it into a Map for consumption in the WikipediaFeedEvent. > I am seeing the following (expected): > > 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] Uncaught > exception in thread (name=main). Exiting process now. > java.lang.ClassCastException: java.lang.String cannot be cast to > java.util.Map > at > samza.examples.wikipedia.task.WikipediaParserStreamTask.process(WikipediaPa > rserStreamTask.java:38) > at > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(Tas > kInstance.scala:133) > > Did you make the changes to fix this error? Your patch doesn¹t seem to > have that. > Line 38 Map<String, Object> jsonObject = (Map<String, Object>) > envelope.getMessage(); > > > > Lmk so I can investigate further. > > Cheers! > Navina > > On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote: > > >If anyone's interested, I've posted a diff of the project here: > >http://pastebin.com/6ZW6Y1Vu > >and the python publisher here: http://pastebin.com/2NvTFDFx > > > >if you want to take a stab at it. > > > >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson <ash.mathe...@gmail.com> > >wrote: > > > >> Ok, so very simple test, all running on a local machine, not across > >> networks and all in the hello-samza repo this time around. > >> > >> I've got the datapusher.py file set up to push data into localhost. One > >> event per second. > >> And a modified hello-samza where I've modified the > >> WikipediaParserStreamTask.java class to simply read what's there. > >> > >> Running them both now and I'm seeing in the stderr files > >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) the > >> following: > >> > >> Exception in thread "main" > >> org.apache.samza.system.SystemConsumersException: Cannot deserialize an > >> incoming message. > >> at > >> > >>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:293) > >> at org.apache.samza.system.SystemConsumers.org > >> $apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260) > >> at > >> > >>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemCo > >>nsumers.scala:276) > >> at > >> > >>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemCo > >>nsumers.scala:276) > >> at > >> > >>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca > >>la:244) > >> at > >> > >>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca > >>la:244) > >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) > >> at > >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > >> at > >> > >>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scal > >>a:47) > >> at scala.collection.SetLike$class.map(SetLike.scala:93) > >> at scala.collection.AbstractSet.map(Set.scala:47) > >> at > >> > >>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:276 > >>) > >> at > >> > >>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:213) > >> at > >> > >>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(Ru > >>nLoop.scala:81) > >> at > >> > >>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(Ru > >>nLoop.scala:81) > >> at > >> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > >> at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > >> at > >> > >>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala > >>:80) > >> at > >> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > >> at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > >> at org.apache.samza.container.RunLoop.process(RunLoop.scala:79) > >> at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) > >> at > >> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) > >> at > >> > >>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala: > >>108) > >> at > >> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) > >> at > >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> Caused by: org.codehaus.jackson.JsonParseException: Unexpected character > >> ('M' (code 77)): expected a valid value (number, String, array, object, > >> 'true', 'false' or 'null') > >> at [Source: [B@5454d285; line: 1, column: 2] > >> at > >> org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) > >> at > >> > >>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMi > >>nimalBase.java:385) > >> at > >> > >>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(Jso > >>nParserMinimalBase.java:306) > >> at > >> > >>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8Str > >>eamParser.java:1581) > >> at > >> > >>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8Stre > >>amParser.java:436) > >> at > >> > >>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.jav > >>a:322) > >> at > >> > >>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java:2 > >>432) > >> at > >> > >>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java: > >>2389) > >> at > >> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) > >> at > >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) > >> at > >> > >>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:11 > >>5) > >> at > >> > >>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:290) > >> > >> > >> I changed the systems.kafka.samza.msg.serde=json to 'string' a while > >>back, > >> but that caused a separate exception. However that was many, MANY > >>attempts > >> ago. > >> > >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <ash.mathe...@gmail.com > > > >> wrote: > >> > >>> Ahh, I was going to add it to the run-class.sh script. > >>> > >>> Yeah, it's already there by default: > >>> > >>> > >>> # Metrics > >>> metrics.reporters=snapshot,jmx > >>> > >>> > >>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metric > >>>sSnapshotReporterFactory > >>> metrics.reporter.snapshot.stream=kafka.metrics > >>> > >>> > >>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporter > >>>Factory > >>> > >>> So, where would I see those metrics? > >>> > >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson > >>><ash.mathe...@gmail.com> > >>> wrote: > >>> > >>>> read: I'm a C++ programmer looking at Java for the first time in > 10 > >>>> years > >>>> > >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson > >>>><ash.mathe...@gmail.com> > >>>> wrote: > >>>> > >>>>> I'm assuming I have Jmx defined ... where would that get set? > >>>>> > >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman < > >>>>> chinmay.cere...@gmail.com> wrote: > >>>>> > >>>>>> Hey Ash, > >>>>>> > >>>>>> Can you see your job metrics (if you have the Jmx metrics defined) > >>>>>>to > >>>>>> see > >>>>>> if your job is actually doing anything ? My only guess at this point > >>>>>> is the > >>>>>> process method is not being called because somehow there's no > >>>>>>incoming > >>>>>> data. I could be totally wrong of course. > >>>>>> > >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson < > >>>>>> ash.mathe...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>> > Just to be clear, here's what's changed from the default > >>>>>>hello-samza > >>>>>> repo: > >>>>>> > > >>>>>> > wikipedia-parser.properties========================== > >>>>>> > task.inputs=kafka.myTopic > >>>>>> > systems.kafka.consumer.zookeeper.connect= > >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ > >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest > >>>>>> > > >>>>>> > WikipediaParserStreamTask.java ===================== > >>>>>> > public void process(IncomingMessageEnvelope envelope, > >>>>>> MessageCollector > >>>>>> > collector, TaskCoordinator coordinator) { > >>>>>> > Map<String, Object> jsonObject = (Map<String, Object>) > >>>>>> > envelope.getMessage(); > >>>>>> > WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); > >>>>>> > > >>>>>> > try { > >>>>>> > System.out.println(event.getRawEvent()); > >>>>>> > // Map<String, Object> parsedJsonObject = > >>>>>> parse(event.getRawEvent()); > >>>>>> > > >>>>>> > // parsedJsonObject.put("channel", event.getChannel()); > >>>>>> > // parsedJsonObject.put("source", event.getSource()); > >>>>>> > // parsedJsonObject.put("time", event.getTime()); > >>>>>> > > >>>>>> > // collector.send(new OutgoingMessageEnvelope(new > >>>>>> > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject)); > >>>>>> > > >>>>>> > as well as the aforementioned changes to the log4j.xml file. > >>>>>> > > >>>>>> > The data pushed into the 'myTopic' topic is nothing more than a > >>>>>> sentence. > >>>>>> > > >>>>>> > > >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson < > >>>>>> ash.mathe...@gmail.com> > >>>>>> > wrote: > >>>>>> > > >>>>>> > > yep, modified log4j.xml to look like this: > >>>>>> > > > >>>>>> > > <root> > >>>>>> > > <priority value="debug" /> > >>>>>> > > <appender-ref ref="RollingAppender"/> > >>>>>> > > <appender-ref ref="jmx" /> > >>>>>> > > </root> > >>>>>> > > > >>>>>> > > Not sure what you mean by #2. > >>>>>> > > > >>>>>> > > However, I'm running now, not seeing any exceptions, but still > >>>>>>not > >>>>>> seeing > >>>>>> > > any output from System.out.println(...) > >>>>>> > > > >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram < > >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote: > >>>>>> > > > >>>>>> > >> Hey Ash, > >>>>>> > >> 1. Did you happen to modify your log4j.xml ? > >>>>>> > >> 2. Can you print the class path that was printed > >>>>>> when the > >>>>>> > >> job started ? I am wondering if log4j was not loaded or not > >>>>>> present in > >>>>>> > the > >>>>>> > >> path where it¹s looking for. If you have been using hello > >>>>>>samza, > >>>>>> it > >>>>>> > should > >>>>>> > >> have pulled it from Maven. > >>>>>> > >> > >>>>>> > >> Thanks, > >>>>>> > >> Naveen > >>>>>> > >> > >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson < > >>>>>> ash.mathe...@gmail.com> > >>>>>> > >> wrote: > >>>>>> > >> > >>>>>> > >> > Hey all, > >>>>>> > >> > > >>>>>> > >> > Evaluating Samza currently and am running into some odd > >>>>>>issues. > >>>>>> > >> > > >>>>>> > >> > I'm currently working off the 'hello-samza' repo and trying > >>>>>>to > >>>>>> parse a > >>>>>> > >> > simple kafka topic that I've produced through an extenal java > >>>>>> app > >>>>>> > >> (nothing > >>>>>> > >> > other than a series of sentences) and it's failing pretty > >>>>>>hard > >>>>>> for me. > >>>>>> > >> The > >>>>>> > >> > base 'hello-samza' set of apps works fine, but as soon as I > >>>>>> change the > >>>>>> > >> > configuration to look at a different Kafka/zookeeper I get > >>>>>>the > >>>>>> > >> following in > >>>>>> > >> > the userlogs: > >>>>>> > >> > > >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch > >>>>>>last > >>>>>> > offsets > >>>>>> > >> > for streams [myTopic] due to kafka.common.KafkaException: > >>>>>> fetching > >>>>>> > topic > >>>>>> > >> > metadata for topics [Set(myTopic)] from broker > >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying. > >>>>>> > >> > > >>>>>> > >> > > >>>>>> > >> > The modifications are pretty straightforward. In the > >>>>>> > >> > Wikipedia-parser.properties, I've changed the following: > >>>>>> > >> > task.inputs=kafka.myTopic > >>>>>> > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/ > >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest > >>>>>> > >> > systems.kafka.producer.metadata.broker.list=redacted:9092 > >>>>>> > >> > > >>>>>> > >> > and in the actual java file WikipediaParserStreamTask.java > >>>>>> > >> > public void process(IncomingMessageEnvelope envelope, > >>>>>> > MessageCollector > >>>>>> > >> > collector, TaskCoordinator coordinator) { > >>>>>> > >> > Map<String, Object> jsonObject = (Map<String, Object>) > >>>>>> > >> > envelope.getMessage(); > >>>>>> > >> > WikipediaFeedEvent event = new > >>>>>> WikipediaFeedEvent(jsonObject); > >>>>>> > >> > > >>>>>> > >> > try { > >>>>>> > >> > System.out.println(event.getRawEvent()); > >>>>>> > >> > > >>>>>> > >> > And then following the compile/extract/run process outlined > >>>>>>in > >>>>>> the > >>>>>> > >> > hello-samza website. > >>>>>> > >> > > >>>>>> > >> > Any thoughts? I've looked online for any 'super simple' > >>>>>> examples of > >>>>>> > >> > ingesting kafka in samza with very little success. > >>>>>> > >> > >>>>>> > >> > >>>>>> > > > >>>>>> > > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> Thanks and regards > >>>>>> > >>>>>> Chinmay Soman > >>>>>> > >>>>> > >>>>> > >>>> > >>> > >> > >