Hey Ash, 
I was referring to the lines before the try block.

Map<String, Object> jsonObject = (Map<String, Object>)
envelope.getMessage();
    WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);

    try {
      System.out.println("[DWH] should see this");
      System.out.println(event.getRawEvent());
…


Did you remove those lines as well?

Navina

On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote:

>Just looking at the diff I posted and it's:
>
>
>   1.      try {
>   2. -      Map<String, Object> parsedJsonObject =
>parse(event.getRawEvent(
>   ));
>   3. +      System.out.println("[DWH] should see this");
>   4. +      System.out.println(event.getRawEvent());
>   5. +      // Map<String, Object> parsedJsonObject = parse(
>   event.getRawEvent());
>
>
>I've removed the Map and added two System.out.println calls.  So no, there
>shouldn't be any reference to
>Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
>in the source java file.
>
>
>On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson <ash.mathe...@gmail.com>
>wrote:
>
>> I'm in transit right now but if memory serves me everything should be
>> commented out of that method except for the System.out.println call.
>>I'll
>> be home shortly and can confirm.
>> On Mar 23, 2015 7:28 PM, "Navina Ramesh" <nram...@linkedin.com.invalid>
>> wrote:
>>
>>> Hi Ash,
>>> I just ran wikipedia-parser with your patch. Looks like you have set
>>>the
>>> message serde correctly in the configs. However, the original code
>>>still
>>> converts it into a Map for consumption in the WikipediaFeedEvent.
>>> I am seeing the following (expected):
>>>
>>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] Uncaught
>>> exception in thread (name=main). Exiting process now.
>>> java.lang.ClassCastException: java.lang.String cannot be cast to
>>> java.util.Map
>>>  at
>>>
>>> 
>>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi
>>>aPa
>>> rserStreamTask.java:38)
>>>  at
>>>
>>> 
>>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(
>>>Tas
>>> kInstance.scala:133)
>>>
>>> Did you make the changes to fix this error? Your patch doesn¹t seem to
>>> have that.
>>> Line 38 Map<String, Object> jsonObject = (Map<String, Object>)
>>> envelope.getMessage();
>>>
>>>
>>>
>>> Lmk so I can investigate further.
>>>
>>> Cheers!
>>> Navina
>>>
>>> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote:
>>>
>>> >If anyone's interested, I've posted a diff of the project here:
>>> >http://pastebin.com/6ZW6Y1Vu
>>> >and the python publisher here: http://pastebin.com/2NvTFDFx
>>> >
>>> >if you want to take a stab at it.
>>> >
>>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson
>>><ash.mathe...@gmail.com>
>>> >wrote:
>>> >
>>> >> Ok, so very simple test, all running on a local machine, not across
>>> >> networks and all in the hello-samza repo this time around.
>>> >>
>>> >> I've got the datapusher.py file set up to push data into localhost.
>>>One
>>> >> event per second.
>>> >> And a modified hello-samza where I've modified the
>>> >> WikipediaParserStreamTask.java class to simply read what's there.
>>> >>
>>> >> Running them both now and I'm seeing in the stderr files
>>> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr)
>>>the
>>> >> following:
>>> >>
>>> >> Exception in thread "main"
>>> >> org.apache.samza.system.SystemConsumersException: Cannot
>>>deserialize an
>>> >> incoming message.
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
>>>>>93)
>>> >>     at org.apache.samza.system.SystemConsumers.org
>>> >> 
>>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
>>>>>mCo
>>> >>nsumers.scala:276)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
>>>>>mCo
>>> >>nsumers.scala:276)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
>>>>>sca
>>> >>la:244)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
>>>>>sca
>>> >>la:244)
>>> >>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>     at 
>>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> >>     at
>>> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>>> >>     at
>>> >> 
>>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s
>>>>>cal
>>> >>a:47)
>>> >>     at scala.collection.SetLike$class.map(SetLike.scala:93)
>>> >>     at scala.collection.AbstractSet.map(Set.scala:47)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:
>>>>>276
>>> >>)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2
>>>>>13)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
>>>>>(Ru
>>> >>nLoop.scala:81)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
>>>>>(Ru
>>> >>nLoop.scala:81)
>>> >>     at
>>> >> 
>>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>>> >>     at 
>>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc
>>>>>ala
>>> >>:80)
>>> >>     at
>>> >> 
>>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>>> >>     at 
>>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>>> >>     at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
>>> >>     at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>>> >>     at
>>> >> 
>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
>>>>>la:
>>> >>108)
>>> >>     at
>>> >>
>>> 
>>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>>> >>     at
>>> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>> >> Caused by: org.codehaus.jackson.JsonParseException: Unexpected
>>> character
>>> >> ('M' (code 77)): expected a valid value (number, String, array,
>>>object,
>>> >> 'true', 'false' or 'null')
>>> >>  at [Source: [B@5454d285; line: 1, column: 2]
>>> >>     at
>>> >> 
>>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse
>>>>>rMi
>>> >>nimalBase.java:385)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(
>>>>>Jso
>>> >>nParserMinimalBase.java:306)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8
>>>>>Str
>>> >>eamParser.java:1581)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S
>>>>>tre
>>> >>amParser.java:436)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.
>>>>>jav
>>> >>a:322)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav
>>>>>a:2
>>> >>432)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja
>>>>>va:
>>> >>2389)
>>> >>     at
>>> >> 
>>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
>>> >>     at
>>> >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala
>>>>>:11
>>> >>5)
>>> >>     at
>>> >>
>>>
>>> 
>>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
>>>>>90)
>>> >>
>>> >>
>>> >> I changed the systems.kafka.samza.msg.serde=json to 'string' a while
>>> >>back,
>>> >> but that caused a separate exception.  However that was many, MANY
>>> >>attempts
>>> >> ago.
>>> >>
>>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <
>>> ash.mathe...@gmail.com>
>>> >> wrote:
>>> >>
>>> >>> Ahh, I was going to add it to the run-class.sh script.
>>> >>>
>>> >>> Yeah, it's already there by default:
>>> >>>
>>> >>>
>>> >>> # Metrics
>>> >>> metrics.reporters=snapshot,jmx
>>> >>>
>>> >>>
>>>
>>> 
>>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met
>>>>>>ric
>>> >>>sSnapshotReporterFactory
>>> >>> metrics.reporter.snapshot.stream=kafka.metrics
>>> >>>
>>> >>>
>>>
>>> 
>>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor
>>>>>>ter
>>> >>>Factory
>>> >>>
>>> >>> So, where would I see those metrics?
>>> >>>
>>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson
>>> >>><ash.mathe...@gmail.com>
>>> >>> wrote:
>>> >>>
>>> >>>> read: I'm a C++ programmer looking at Java for the first time in
>>>> 10
>>> >>>> years
>>> >>>>
>>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson
>>> >>>><ash.mathe...@gmail.com>
>>> >>>> wrote:
>>> >>>>
>>> >>>>> I'm assuming I have Jmx defined ... where would that get set?
>>> >>>>>
>>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman <
>>> >>>>> chinmay.cere...@gmail.com> wrote:
>>> >>>>>
>>> >>>>>> Hey Ash,
>>> >>>>>>
>>> >>>>>> Can you see your job metrics (if you have the Jmx metrics
>>>defined)
>>> >>>>>>to
>>> >>>>>> see
>>> >>>>>> if your job is actually doing anything ? My only guess at this
>>> point
>>> >>>>>> is the
>>> >>>>>> process method is not being called because somehow there's no
>>> >>>>>>incoming
>>> >>>>>> data. I could be totally wrong of course.
>>> >>>>>>
>>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <
>>> >>>>>> ash.mathe...@gmail.com>
>>> >>>>>> wrote:
>>> >>>>>>
>>> >>>>>> > Just to be clear, here's what's changed from the default
>>> >>>>>>hello-samza
>>> >>>>>> repo:
>>> >>>>>> >
>>> >>>>>> > wikipedia-parser.properties==========================
>>> >>>>>> > task.inputs=kafka.myTopic
>>> >>>>>> > systems.kafka.consumer.zookeeper.connect=
>>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
>>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest
>>> >>>>>> >
>>> >>>>>> > WikipediaParserStreamTask.java =====================
>>> >>>>>> >   public void process(IncomingMessageEnvelope envelope,
>>> >>>>>> MessageCollector
>>> >>>>>> > collector, TaskCoordinator coordinator) {
>>> >>>>>> >     Map<String, Object> jsonObject = (Map<String, Object>)
>>> >>>>>> > envelope.getMessage();
>>> >>>>>> >     WikipediaFeedEvent event = new
>>> WikipediaFeedEvent(jsonObject);
>>> >>>>>> >
>>> >>>>>> >     try {
>>> >>>>>> >       System.out.println(event.getRawEvent());
>>> >>>>>> >       // Map<String, Object> parsedJsonObject =
>>> >>>>>> parse(event.getRawEvent());
>>> >>>>>> >
>>> >>>>>> >       // parsedJsonObject.put("channel", event.getChannel());
>>> >>>>>> >       // parsedJsonObject.put("source", event.getSource());
>>> >>>>>> >       // parsedJsonObject.put("time", event.getTime());
>>> >>>>>> >
>>> >>>>>> >       // collector.send(new OutgoingMessageEnvelope(new
>>> >>>>>> > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
>>> >>>>>> >
>>> >>>>>> > as well as the aforementioned changes to the log4j.xml file.
>>> >>>>>> >
>>> >>>>>> > The data pushed into the 'myTopic' topic is nothing more than
>>>a
>>> >>>>>> sentence.
>>> >>>>>> >
>>> >>>>>> >
>>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <
>>> >>>>>> ash.mathe...@gmail.com>
>>> >>>>>> > wrote:
>>> >>>>>> >
>>> >>>>>> > > yep, modified log4j.xml to look like this:
>>> >>>>>> > >
>>> >>>>>> > >   <root>
>>> >>>>>> > >     <priority value="debug" />
>>> >>>>>> > >     <appender-ref ref="RollingAppender"/>
>>> >>>>>> > >     <appender-ref ref="jmx" />
>>> >>>>>> > >   </root>
>>> >>>>>> > >
>>> >>>>>> > > Not sure what you mean by #2.
>>> >>>>>> > >
>>> >>>>>> > > However, I'm running now, not seeing any exceptions, but
>>>still
>>> >>>>>>not
>>> >>>>>> seeing
>>> >>>>>> > > any output from System.out.println(...)
>>> >>>>>> > >
>>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram <
>>> >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote:
>>> >>>>>> > >
>>> >>>>>> > >> Hey Ash,
>>> >>>>>> > >>                1. Did you happen to modify your log4j.xml ?
>>> >>>>>> > >>                2. Can you print the class path that was
>>> printed
>>> >>>>>> when the
>>> >>>>>> > >> job started ? I am wondering if log4j was not loaded or not
>>> >>>>>> present in
>>> >>>>>> > the
>>> >>>>>> > >> path where it¹s looking for. If you have been using hello
>>> >>>>>>samza,
>>> >>>>>> it
>>> >>>>>> > should
>>> >>>>>> > >> have pulled it from Maven.
>>> >>>>>> > >>
>>> >>>>>> > >> Thanks,
>>> >>>>>> > >> Naveen
>>> >>>>>> > >>
>>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <
>>> >>>>>> ash.mathe...@gmail.com>
>>> >>>>>> > >> wrote:
>>> >>>>>> > >>
>>> >>>>>> > >> > Hey all,
>>> >>>>>> > >> >
>>> >>>>>> > >> > Evaluating Samza currently and am running into some odd
>>> >>>>>>issues.
>>> >>>>>> > >> >
>>> >>>>>> > >> > I'm currently working off the 'hello-samza' repo and
>>>trying
>>> >>>>>>to
>>> >>>>>> parse a
>>> >>>>>> > >> > simple kafka topic that I've produced through an extenal
>>> java
>>> >>>>>> app
>>> >>>>>> > >> (nothing
>>> >>>>>> > >> > other than a series of sentences) and it's failing pretty
>>> >>>>>>hard
>>> >>>>>> for me.
>>> >>>>>> > >> The
>>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but as soon
>>>as I
>>> >>>>>> change the
>>> >>>>>> > >> > configuration to look at a different Kafka/zookeeper I
>>>get
>>> >>>>>>the
>>> >>>>>> > >> following in
>>> >>>>>> > >> > the userlogs:
>>> >>>>>> > >> >
>>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to
>>>fetch
>>> >>>>>>last
>>> >>>>>> > offsets
>>> >>>>>> > >> > for streams [myTopic] due to kafka.common.KafkaException:
>>> >>>>>> fetching
>>> >>>>>> > topic
>>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker
>>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed.
>>> Retrying.
>>> >>>>>> > >> >
>>> >>>>>> > >> >
>>> >>>>>> > >> > The modifications are pretty straightforward.  In the
>>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the following:
>>> >>>>>> > >> > task.inputs=kafka.myTopic
>>> >>>>>> > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
>>> >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest
>>> >>>>>> > >> > systems.kafka.producer.metadata.broker.list=redacted:9092
>>> >>>>>> > >> >
>>> >>>>>> > >> > and in the actual java file
>>>WikipediaParserStreamTask.java
>>> >>>>>> > >> >  public void process(IncomingMessageEnvelope envelope,
>>> >>>>>> > MessageCollector
>>> >>>>>> > >> > collector, TaskCoordinator coordinator) {
>>> >>>>>> > >> >    Map<String, Object> jsonObject = (Map<String, Object>)
>>> >>>>>> > >> > envelope.getMessage();
>>> >>>>>> > >> >    WikipediaFeedEvent event = new
>>> >>>>>> WikipediaFeedEvent(jsonObject);
>>> >>>>>> > >> >
>>> >>>>>> > >> >    try {
>>> >>>>>> > >> >        System.out.println(event.getRawEvent());
>>> >>>>>> > >> >
>>> >>>>>> > >> > And then following the compile/extract/run process
>>>outlined
>>> >>>>>>in
>>> >>>>>> the
>>> >>>>>> > >> > hello-samza website.
>>> >>>>>> > >> >
>>> >>>>>> > >> > Any thoughts?  I've looked online for any 'super simple'
>>> >>>>>> examples of
>>> >>>>>> > >> > ingesting kafka in samza with very little success.
>>> >>>>>> > >>
>>> >>>>>> > >>
>>> >>>>>> > >
>>> >>>>>> >
>>> >>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> --
>>> >>>>>> Thanks and regards
>>> >>>>>>
>>> >>>>>> Chinmay Soman
>>> >>>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> >>
>>>
>>>

Reply via email to