Just looking at the diff I posted and it's:
1. try { 2. - Map<String, Object> parsedJsonObject = parse(event.getRawEvent( )); 3. + System.out.println("[DWH] should see this"); 4. + System.out.println(event.getRawEvent()); 5. + // Map<String, Object> parsedJsonObject = parse( event.getRawEvent()); I've removed the Map and added two System.out.println calls. So no, there shouldn't be any reference to Map<String, Object> parsedJsonObject = parse(event.getRawEvent()); in the source java file. On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson <ash.mathe...@gmail.com> wrote: > I'm in transit right now but if memory serves me everything should be > commented out of that method except for the System.out.println call. I'll > be home shortly and can confirm. > On Mar 23, 2015 7:28 PM, "Navina Ramesh" <nram...@linkedin.com.invalid> > wrote: > >> Hi Ash, >> I just ran wikipedia-parser with your patch. Looks like you have set the >> message serde correctly in the configs. However, the original code still >> converts it into a Map for consumption in the WikipediaFeedEvent. >> I am seeing the following (expected): >> >> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] Uncaught >> exception in thread (name=main). Exiting process now. >> java.lang.ClassCastException: java.lang.String cannot be cast to >> java.util.Map >> at >> >> samza.examples.wikipedia.task.WikipediaParserStreamTask.process(WikipediaPa >> rserStreamTask.java:38) >> at >> >> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(Tas >> kInstance.scala:133) >> >> Did you make the changes to fix this error? Your patch doesn¹t seem to >> have that. >> Line 38 Map<String, Object> jsonObject = (Map<String, Object>) >> envelope.getMessage(); >> >> >> >> Lmk so I can investigate further. >> >> Cheers! >> Navina >> >> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote: >> >> >If anyone's interested, I've posted a diff of the project here: >> >http://pastebin.com/6ZW6Y1Vu >> >and the python publisher here: http://pastebin.com/2NvTFDFx >> > >> >if you want to take a stab at it. >> > >> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson <ash.mathe...@gmail.com> >> >wrote: >> > >> >> Ok, so very simple test, all running on a local machine, not across >> >> networks and all in the hello-samza repo this time around. >> >> >> >> I've got the datapusher.py file set up to push data into localhost. One >> >> event per second. >> >> And a modified hello-samza where I've modified the >> >> WikipediaParserStreamTask.java class to simply read what's there. >> >> >> >> Running them both now and I'm seeing in the stderr files >> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) the >> >> following: >> >> >> >> Exception in thread "main" >> >> org.apache.samza.system.SystemConsumersException: Cannot deserialize an >> >> incoming message. >> >> at >> >> >> >> >>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:293) >> >> at org.apache.samza.system.SystemConsumers.org >> >> $apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260) >> >> at >> >> >> >> >>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemCo >> >>nsumers.scala:276) >> >> at >> >> >> >> >>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemCo >> >>nsumers.scala:276) >> >> at >> >> >> >> >>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca >> >>la:244) >> >> at >> >> >> >> >>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca >> >>la:244) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> at >> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) >> >> at >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >> at >> >> >> >> >>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scal >> >>a:47) >> >> at scala.collection.SetLike$class.map(SetLike.scala:93) >> >> at scala.collection.AbstractSet.map(Set.scala:47) >> >> at >> >> >> >> >>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:276 >> >>) >> >> at >> >> >> >> >>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:213) >> >> at >> >> >> >> >>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(Ru >> >>nLoop.scala:81) >> >> at >> >> >> >> >>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(Ru >> >>nLoop.scala:81) >> >> at >> >> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >> >> at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) >> >> at >> >> >> >> >>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala >> >>:80) >> >> at >> >> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >> >> at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) >> >> at org.apache.samza.container.RunLoop.process(RunLoop.scala:79) >> >> at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) >> >> at >> >> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) >> >> at >> >> >> >> >>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala: >> >>108) >> >> at >> >> >> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) >> >> at >> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >> >> Caused by: org.codehaus.jackson.JsonParseException: Unexpected >> character >> >> ('M' (code 77)): expected a valid value (number, String, array, object, >> >> 'true', 'false' or 'null') >> >> at [Source: [B@5454d285; line: 1, column: 2] >> >> at >> >> org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) >> >> at >> >> >> >> >>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMi >> >>nimalBase.java:385) >> >> at >> >> >> >> >>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(Jso >> >>nParserMinimalBase.java:306) >> >> at >> >> >> >> >>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8Str >> >>eamParser.java:1581) >> >> at >> >> >> >> >>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8Stre >> >>amParser.java:436) >> >> at >> >> >> >> >>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.jav >> >>a:322) >> >> at >> >> >> >> >>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java:2 >> >>432) >> >> at >> >> >> >> >>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java: >> >>2389) >> >> at >> >> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) >> >> at >> >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) >> >> at >> >> >> >> >>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:11 >> >>5) >> >> at >> >> >> >> >>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:290) >> >> >> >> >> >> I changed the systems.kafka.samza.msg.serde=json to 'string' a while >> >>back, >> >> but that caused a separate exception. However that was many, MANY >> >>attempts >> >> ago. >> >> >> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson < >> ash.mathe...@gmail.com> >> >> wrote: >> >> >> >>> Ahh, I was going to add it to the run-class.sh script. >> >>> >> >>> Yeah, it's already there by default: >> >>> >> >>> >> >>> # Metrics >> >>> metrics.reporters=snapshot,jmx >> >>> >> >>> >> >> >>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metric >> >>>sSnapshotReporterFactory >> >>> metrics.reporter.snapshot.stream=kafka.metrics >> >>> >> >>> >> >> >>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporter >> >>>Factory >> >>> >> >>> So, where would I see those metrics? >> >>> >> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson >> >>><ash.mathe...@gmail.com> >> >>> wrote: >> >>> >> >>>> read: I'm a C++ programmer looking at Java for the first time in > 10 >> >>>> years >> >>>> >> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson >> >>>><ash.mathe...@gmail.com> >> >>>> wrote: >> >>>> >> >>>>> I'm assuming I have Jmx defined ... where would that get set? >> >>>>> >> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman < >> >>>>> chinmay.cere...@gmail.com> wrote: >> >>>>> >> >>>>>> Hey Ash, >> >>>>>> >> >>>>>> Can you see your job metrics (if you have the Jmx metrics defined) >> >>>>>>to >> >>>>>> see >> >>>>>> if your job is actually doing anything ? My only guess at this >> point >> >>>>>> is the >> >>>>>> process method is not being called because somehow there's no >> >>>>>>incoming >> >>>>>> data. I could be totally wrong of course. >> >>>>>> >> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson < >> >>>>>> ash.mathe...@gmail.com> >> >>>>>> wrote: >> >>>>>> >> >>>>>> > Just to be clear, here's what's changed from the default >> >>>>>>hello-samza >> >>>>>> repo: >> >>>>>> > >> >>>>>> > wikipedia-parser.properties========================== >> >>>>>> > task.inputs=kafka.myTopic >> >>>>>> > systems.kafka.consumer.zookeeper.connect= >> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ >> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest >> >>>>>> > >> >>>>>> > WikipediaParserStreamTask.java ===================== >> >>>>>> > public void process(IncomingMessageEnvelope envelope, >> >>>>>> MessageCollector >> >>>>>> > collector, TaskCoordinator coordinator) { >> >>>>>> > Map<String, Object> jsonObject = (Map<String, Object>) >> >>>>>> > envelope.getMessage(); >> >>>>>> > WikipediaFeedEvent event = new >> WikipediaFeedEvent(jsonObject); >> >>>>>> > >> >>>>>> > try { >> >>>>>> > System.out.println(event.getRawEvent()); >> >>>>>> > // Map<String, Object> parsedJsonObject = >> >>>>>> parse(event.getRawEvent()); >> >>>>>> > >> >>>>>> > // parsedJsonObject.put("channel", event.getChannel()); >> >>>>>> > // parsedJsonObject.put("source", event.getSource()); >> >>>>>> > // parsedJsonObject.put("time", event.getTime()); >> >>>>>> > >> >>>>>> > // collector.send(new OutgoingMessageEnvelope(new >> >>>>>> > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject)); >> >>>>>> > >> >>>>>> > as well as the aforementioned changes to the log4j.xml file. >> >>>>>> > >> >>>>>> > The data pushed into the 'myTopic' topic is nothing more than a >> >>>>>> sentence. >> >>>>>> > >> >>>>>> > >> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson < >> >>>>>> ash.mathe...@gmail.com> >> >>>>>> > wrote: >> >>>>>> > >> >>>>>> > > yep, modified log4j.xml to look like this: >> >>>>>> > > >> >>>>>> > > <root> >> >>>>>> > > <priority value="debug" /> >> >>>>>> > > <appender-ref ref="RollingAppender"/> >> >>>>>> > > <appender-ref ref="jmx" /> >> >>>>>> > > </root> >> >>>>>> > > >> >>>>>> > > Not sure what you mean by #2. >> >>>>>> > > >> >>>>>> > > However, I'm running now, not seeing any exceptions, but still >> >>>>>>not >> >>>>>> seeing >> >>>>>> > > any output from System.out.println(...) >> >>>>>> > > >> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram < >> >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote: >> >>>>>> > > >> >>>>>> > >> Hey Ash, >> >>>>>> > >> 1. Did you happen to modify your log4j.xml ? >> >>>>>> > >> 2. Can you print the class path that was >> printed >> >>>>>> when the >> >>>>>> > >> job started ? I am wondering if log4j was not loaded or not >> >>>>>> present in >> >>>>>> > the >> >>>>>> > >> path where it¹s looking for. If you have been using hello >> >>>>>>samza, >> >>>>>> it >> >>>>>> > should >> >>>>>> > >> have pulled it from Maven. >> >>>>>> > >> >> >>>>>> > >> Thanks, >> >>>>>> > >> Naveen >> >>>>>> > >> >> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson < >> >>>>>> ash.mathe...@gmail.com> >> >>>>>> > >> wrote: >> >>>>>> > >> >> >>>>>> > >> > Hey all, >> >>>>>> > >> > >> >>>>>> > >> > Evaluating Samza currently and am running into some odd >> >>>>>>issues. >> >>>>>> > >> > >> >>>>>> > >> > I'm currently working off the 'hello-samza' repo and trying >> >>>>>>to >> >>>>>> parse a >> >>>>>> > >> > simple kafka topic that I've produced through an extenal >> java >> >>>>>> app >> >>>>>> > >> (nothing >> >>>>>> > >> > other than a series of sentences) and it's failing pretty >> >>>>>>hard >> >>>>>> for me. >> >>>>>> > >> The >> >>>>>> > >> > base 'hello-samza' set of apps works fine, but as soon as I >> >>>>>> change the >> >>>>>> > >> > configuration to look at a different Kafka/zookeeper I get >> >>>>>>the >> >>>>>> > >> following in >> >>>>>> > >> > the userlogs: >> >>>>>> > >> > >> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch >> >>>>>>last >> >>>>>> > offsets >> >>>>>> > >> > for streams [myTopic] due to kafka.common.KafkaException: >> >>>>>> fetching >> >>>>>> > topic >> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker >> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. >> Retrying. >> >>>>>> > >> > >> >>>>>> > >> > >> >>>>>> > >> > The modifications are pretty straightforward. In the >> >>>>>> > >> > Wikipedia-parser.properties, I've changed the following: >> >>>>>> > >> > task.inputs=kafka.myTopic >> >>>>>> > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/ >> >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest >> >>>>>> > >> > systems.kafka.producer.metadata.broker.list=redacted:9092 >> >>>>>> > >> > >> >>>>>> > >> > and in the actual java file WikipediaParserStreamTask.java >> >>>>>> > >> > public void process(IncomingMessageEnvelope envelope, >> >>>>>> > MessageCollector >> >>>>>> > >> > collector, TaskCoordinator coordinator) { >> >>>>>> > >> > Map<String, Object> jsonObject = (Map<String, Object>) >> >>>>>> > >> > envelope.getMessage(); >> >>>>>> > >> > WikipediaFeedEvent event = new >> >>>>>> WikipediaFeedEvent(jsonObject); >> >>>>>> > >> > >> >>>>>> > >> > try { >> >>>>>> > >> > System.out.println(event.getRawEvent()); >> >>>>>> > >> > >> >>>>>> > >> > And then following the compile/extract/run process outlined >> >>>>>>in >> >>>>>> the >> >>>>>> > >> > hello-samza website. >> >>>>>> > >> > >> >>>>>> > >> > Any thoughts? I've looked online for any 'super simple' >> >>>>>> examples of >> >>>>>> > >> > ingesting kafka in samza with very little success. >> >>>>>> > >> >> >>>>>> > >> >> >>>>>> > > >> >>>>>> > >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> -- >> >>>>>> Thanks and regards >> >>>>>> >> >>>>>> Chinmay Soman >> >>>>>> >> >>>>> >> >>>>> >> >>>> >> >>> >> >> >> >>