Hey Ash, I was referring to the lines before the try block. Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage(); WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
try { System.out.println("[DWH] should see this"); System.out.println(event.getRawEvent()); … Did you remove those lines as well? Navina On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote: >Just looking at the diff I posted and it's: > > > 1. try { > 2. - Map<String, Object> parsedJsonObject = >parse(event.getRawEvent( > )); > 3. + System.out.println("[DWH] should see this"); > 4. + System.out.println(event.getRawEvent()); > 5. + // Map<String, Object> parsedJsonObject = parse( > event.getRawEvent()); > > >I've removed the Map and added two System.out.println calls. So no, there >shouldn't be any reference to >Map<String, Object> parsedJsonObject = parse(event.getRawEvent()); >in the source java file. > > >On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson <ash.mathe...@gmail.com> >wrote: > >> I'm in transit right now but if memory serves me everything should be >> commented out of that method except for the System.out.println call. >>I'll >> be home shortly and can confirm. >> On Mar 23, 2015 7:28 PM, "Navina Ramesh" <nram...@linkedin.com.invalid> >> wrote: >> >>> Hi Ash, >>> I just ran wikipedia-parser with your patch. Looks like you have set >>>the >>> message serde correctly in the configs. However, the original code >>>still >>> converts it into a Map for consumption in the WikipediaFeedEvent. >>> I am seeing the following (expected): >>> >>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] Uncaught >>> exception in thread (name=main). Exiting process now. >>> java.lang.ClassCastException: java.lang.String cannot be cast to >>> java.util.Map >>> at >>> >>> >>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi >>>aPa >>> rserStreamTask.java:38) >>> at >>> >>> >>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp( >>>Tas >>> kInstance.scala:133) >>> >>> Did you make the changes to fix this error? Your patch doesn¹t seem to >>> have that. >>> Line 38 Map<String, Object> jsonObject = (Map<String, Object>) >>> envelope.getMessage(); >>> >>> >>> >>> Lmk so I can investigate further. >>> >>> Cheers! >>> Navina >>> >>> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote: >>> >>> >If anyone's interested, I've posted a diff of the project here: >>> >http://pastebin.com/6ZW6Y1Vu >>> >and the python publisher here: http://pastebin.com/2NvTFDFx >>> > >>> >if you want to take a stab at it. >>> > >>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson >>><ash.mathe...@gmail.com> >>> >wrote: >>> > >>> >> Ok, so very simple test, all running on a local machine, not across >>> >> networks and all in the hello-samza repo this time around. >>> >> >>> >> I've got the datapusher.py file set up to push data into localhost. >>>One >>> >> event per second. >>> >> And a modified hello-samza where I've modified the >>> >> WikipediaParserStreamTask.java class to simply read what's there. >>> >> >>> >> Running them both now and I'm seeing in the stderr files >>> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) >>>the >>> >> following: >>> >> >>> >> Exception in thread "main" >>> >> org.apache.samza.system.SystemConsumersException: Cannot >>>deserialize an >>> >> incoming message. >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2 >>>>>93) >>> >> at org.apache.samza.system.SystemConsumers.org >>> >> >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260) >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste >>>>>mCo >>> >>nsumers.scala:276) >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste >>>>>mCo >>> >>nsumers.scala:276) >>> >> at >>> >> >>> >>> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. >>>>>sca >>> >>la:244) >>> >> at >>> >> >>> >>> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. >>>>>sca >>> >>la:244) >>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> >> at >>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> >> at >>> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) >>> >> at >>> >> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>> >> at >>> >> >>> >>> >>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s >>>>>cal >>> >>a:47) >>> >> at scala.collection.SetLike$class.map(SetLike.scala:93) >>> >> at scala.collection.AbstractSet.map(Set.scala:47) >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala: >>>>>276 >>> >>) >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2 >>>>>13) >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply >>>>>(Ru >>> >>nLoop.scala:81) >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply >>>>>(Ru >>> >>nLoop.scala:81) >>> >> at >>> >> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >>> >> at >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc >>>>>ala >>> >>:80) >>> >> at >>> >> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >>> >> at >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) >>> >> at org.apache.samza.container.RunLoop.process(RunLoop.scala:79) >>> >> at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) >>> >> at >>> >> >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca >>>>>la: >>> >>108) >>> >> at >>> >> >>> >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) >>> >> at >>> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >>> >> Caused by: org.codehaus.jackson.JsonParseException: Unexpected >>> character >>> >> ('M' (code 77)): expected a valid value (number, String, array, >>>object, >>> >> 'true', 'false' or 'null') >>> >> at [Source: [B@5454d285; line: 1, column: 2] >>> >> at >>> >> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) >>> >> at >>> >> >>> >>> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse >>>>>rMi >>> >>nimalBase.java:385) >>> >> at >>> >> >>> >>> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar( >>>>>Jso >>> >>nParserMinimalBase.java:306) >>> >> at >>> >> >>> >>> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8 >>>>>Str >>> >>eamParser.java:1581) >>> >> at >>> >> >>> >>> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S >>>>>tre >>> >>amParser.java:436) >>> >> at >>> >> >>> >>> >>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser. >>>>>jav >>> >>a:322) >>> >> at >>> >> >>> >>> >>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav >>>>>a:2 >>> >>432) >>> >> at >>> >> >>> >>> >>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja >>>>>va: >>> >>2389) >>> >> at >>> >> >>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) >>> >> at >>> >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala >>>>>:11 >>> >>5) >>> >> at >>> >> >>> >>> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2 >>>>>90) >>> >> >>> >> >>> >> I changed the systems.kafka.samza.msg.serde=json to 'string' a while >>> >>back, >>> >> but that caused a separate exception. However that was many, MANY >>> >>attempts >>> >> ago. >>> >> >>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson < >>> ash.mathe...@gmail.com> >>> >> wrote: >>> >> >>> >>> Ahh, I was going to add it to the run-class.sh script. >>> >>> >>> >>> Yeah, it's already there by default: >>> >>> >>> >>> >>> >>> # Metrics >>> >>> metrics.reporters=snapshot,jmx >>> >>> >>> >>> >>> >>> >>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met >>>>>>ric >>> >>>sSnapshotReporterFactory >>> >>> metrics.reporter.snapshot.stream=kafka.metrics >>> >>> >>> >>> >>> >>> >>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor >>>>>>ter >>> >>>Factory >>> >>> >>> >>> So, where would I see those metrics? >>> >>> >>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson >>> >>><ash.mathe...@gmail.com> >>> >>> wrote: >>> >>> >>> >>>> read: I'm a C++ programmer looking at Java for the first time in >>>> 10 >>> >>>> years >>> >>>> >>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson >>> >>>><ash.mathe...@gmail.com> >>> >>>> wrote: >>> >>>> >>> >>>>> I'm assuming I have Jmx defined ... where would that get set? >>> >>>>> >>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman < >>> >>>>> chinmay.cere...@gmail.com> wrote: >>> >>>>> >>> >>>>>> Hey Ash, >>> >>>>>> >>> >>>>>> Can you see your job metrics (if you have the Jmx metrics >>>defined) >>> >>>>>>to >>> >>>>>> see >>> >>>>>> if your job is actually doing anything ? My only guess at this >>> point >>> >>>>>> is the >>> >>>>>> process method is not being called because somehow there's no >>> >>>>>>incoming >>> >>>>>> data. I could be totally wrong of course. >>> >>>>>> >>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson < >>> >>>>>> ash.mathe...@gmail.com> >>> >>>>>> wrote: >>> >>>>>> >>> >>>>>> > Just to be clear, here's what's changed from the default >>> >>>>>>hello-samza >>> >>>>>> repo: >>> >>>>>> > >>> >>>>>> > wikipedia-parser.properties========================== >>> >>>>>> > task.inputs=kafka.myTopic >>> >>>>>> > systems.kafka.consumer.zookeeper.connect= >>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ >>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest >>> >>>>>> > >>> >>>>>> > WikipediaParserStreamTask.java ===================== >>> >>>>>> > public void process(IncomingMessageEnvelope envelope, >>> >>>>>> MessageCollector >>> >>>>>> > collector, TaskCoordinator coordinator) { >>> >>>>>> > Map<String, Object> jsonObject = (Map<String, Object>) >>> >>>>>> > envelope.getMessage(); >>> >>>>>> > WikipediaFeedEvent event = new >>> WikipediaFeedEvent(jsonObject); >>> >>>>>> > >>> >>>>>> > try { >>> >>>>>> > System.out.println(event.getRawEvent()); >>> >>>>>> > // Map<String, Object> parsedJsonObject = >>> >>>>>> parse(event.getRawEvent()); >>> >>>>>> > >>> >>>>>> > // parsedJsonObject.put("channel", event.getChannel()); >>> >>>>>> > // parsedJsonObject.put("source", event.getSource()); >>> >>>>>> > // parsedJsonObject.put("time", event.getTime()); >>> >>>>>> > >>> >>>>>> > // collector.send(new OutgoingMessageEnvelope(new >>> >>>>>> > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject)); >>> >>>>>> > >>> >>>>>> > as well as the aforementioned changes to the log4j.xml file. >>> >>>>>> > >>> >>>>>> > The data pushed into the 'myTopic' topic is nothing more than >>>a >>> >>>>>> sentence. >>> >>>>>> > >>> >>>>>> > >>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson < >>> >>>>>> ash.mathe...@gmail.com> >>> >>>>>> > wrote: >>> >>>>>> > >>> >>>>>> > > yep, modified log4j.xml to look like this: >>> >>>>>> > > >>> >>>>>> > > <root> >>> >>>>>> > > <priority value="debug" /> >>> >>>>>> > > <appender-ref ref="RollingAppender"/> >>> >>>>>> > > <appender-ref ref="jmx" /> >>> >>>>>> > > </root> >>> >>>>>> > > >>> >>>>>> > > Not sure what you mean by #2. >>> >>>>>> > > >>> >>>>>> > > However, I'm running now, not seeing any exceptions, but >>>still >>> >>>>>>not >>> >>>>>> seeing >>> >>>>>> > > any output from System.out.println(...) >>> >>>>>> > > >>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram < >>> >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote: >>> >>>>>> > > >>> >>>>>> > >> Hey Ash, >>> >>>>>> > >> 1. Did you happen to modify your log4j.xml ? >>> >>>>>> > >> 2. Can you print the class path that was >>> printed >>> >>>>>> when the >>> >>>>>> > >> job started ? I am wondering if log4j was not loaded or not >>> >>>>>> present in >>> >>>>>> > the >>> >>>>>> > >> path where it¹s looking for. If you have been using hello >>> >>>>>>samza, >>> >>>>>> it >>> >>>>>> > should >>> >>>>>> > >> have pulled it from Maven. >>> >>>>>> > >> >>> >>>>>> > >> Thanks, >>> >>>>>> > >> Naveen >>> >>>>>> > >> >>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson < >>> >>>>>> ash.mathe...@gmail.com> >>> >>>>>> > >> wrote: >>> >>>>>> > >> >>> >>>>>> > >> > Hey all, >>> >>>>>> > >> > >>> >>>>>> > >> > Evaluating Samza currently and am running into some odd >>> >>>>>>issues. >>> >>>>>> > >> > >>> >>>>>> > >> > I'm currently working off the 'hello-samza' repo and >>>trying >>> >>>>>>to >>> >>>>>> parse a >>> >>>>>> > >> > simple kafka topic that I've produced through an extenal >>> java >>> >>>>>> app >>> >>>>>> > >> (nothing >>> >>>>>> > >> > other than a series of sentences) and it's failing pretty >>> >>>>>>hard >>> >>>>>> for me. >>> >>>>>> > >> The >>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but as soon >>>as I >>> >>>>>> change the >>> >>>>>> > >> > configuration to look at a different Kafka/zookeeper I >>>get >>> >>>>>>the >>> >>>>>> > >> following in >>> >>>>>> > >> > the userlogs: >>> >>>>>> > >> > >>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to >>>fetch >>> >>>>>>last >>> >>>>>> > offsets >>> >>>>>> > >> > for streams [myTopic] due to kafka.common.KafkaException: >>> >>>>>> fetching >>> >>>>>> > topic >>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker >>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. >>> Retrying. >>> >>>>>> > >> > >>> >>>>>> > >> > >>> >>>>>> > >> > The modifications are pretty straightforward. In the >>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the following: >>> >>>>>> > >> > task.inputs=kafka.myTopic >>> >>>>>> > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/ >>> >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest >>> >>>>>> > >> > systems.kafka.producer.metadata.broker.list=redacted:9092 >>> >>>>>> > >> > >>> >>>>>> > >> > and in the actual java file >>>WikipediaParserStreamTask.java >>> >>>>>> > >> > public void process(IncomingMessageEnvelope envelope, >>> >>>>>> > MessageCollector >>> >>>>>> > >> > collector, TaskCoordinator coordinator) { >>> >>>>>> > >> > Map<String, Object> jsonObject = (Map<String, Object>) >>> >>>>>> > >> > envelope.getMessage(); >>> >>>>>> > >> > WikipediaFeedEvent event = new >>> >>>>>> WikipediaFeedEvent(jsonObject); >>> >>>>>> > >> > >>> >>>>>> > >> > try { >>> >>>>>> > >> > System.out.println(event.getRawEvent()); >>> >>>>>> > >> > >>> >>>>>> > >> > And then following the compile/extract/run process >>>outlined >>> >>>>>>in >>> >>>>>> the >>> >>>>>> > >> > hello-samza website. >>> >>>>>> > >> > >>> >>>>>> > >> > Any thoughts? I've looked online for any 'super simple' >>> >>>>>> examples of >>> >>>>>> > >> > ingesting kafka in samza with very little success. >>> >>>>>> > >> >>> >>>>>> > >> >>> >>>>>> > > >>> >>>>>> > >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> -- >>> >>>>>> Thanks and regards >>> >>>>>> >>> >>>>>> Chinmay Soman >>> >>>>>> >>> >>>>> >>> >>>>> >>> >>>> >>> >>> >>> >> >>> >>>