Just looking at the diff I posted and it's:

   1.      try {
   2. -      Map<String, Object> parsedJsonObject = parse(event.getRawEvent(
   ));
   3. +      System.out.println("[DWH] should see this");
   4. +      System.out.println(event.getRawEvent());
   5. +      // Map<String, Object> parsedJsonObject = parse(
   event.getRawEvent());


I've removed the Map and added two System.out.println calls.  So no, there
shouldn't be any reference to
Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
in the source java file.


On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson <ash.mathe...@gmail.com>
wrote:

> I'm in transit right now but if memory serves me everything should be
> commented out of that method except for the System.out.println call. I'll
> be home shortly and can confirm.
> On Mar 23, 2015 7:28 PM, "Navina Ramesh" <nram...@linkedin.com.invalid>
> wrote:
>
>> Hi Ash,
>> I just ran wikipedia-parser with your patch. Looks like you have set the
>> message serde correctly in the configs. However, the original code still
>> converts it into a Map for consumption in the WikipediaFeedEvent.
>> I am seeing the following (expected):
>>
>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] Uncaught
>> exception in thread (name=main). Exiting process now.
>> java.lang.ClassCastException: java.lang.String cannot be cast to
>> java.util.Map
>>  at
>>
>> samza.examples.wikipedia.task.WikipediaParserStreamTask.process(WikipediaPa
>> rserStreamTask.java:38)
>>  at
>>
>> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(Tas
>> kInstance.scala:133)
>>
>> Did you make the changes to fix this error? Your patch doesn¹t seem to
>> have that.
>> Line 38 Map<String, Object> jsonObject = (Map<String, Object>)
>> envelope.getMessage();
>>
>>
>>
>> Lmk so I can investigate further.
>>
>> Cheers!
>> Navina
>>
>> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote:
>>
>> >If anyone's interested, I've posted a diff of the project here:
>> >http://pastebin.com/6ZW6Y1Vu
>> >and the python publisher here: http://pastebin.com/2NvTFDFx
>> >
>> >if you want to take a stab at it.
>> >
>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson <ash.mathe...@gmail.com>
>> >wrote:
>> >
>> >> Ok, so very simple test, all running on a local machine, not across
>> >> networks and all in the hello-samza repo this time around.
>> >>
>> >> I've got the datapusher.py file set up to push data into localhost. One
>> >> event per second.
>> >> And a modified hello-samza where I've modified the
>> >> WikipediaParserStreamTask.java class to simply read what's there.
>> >>
>> >> Running them both now and I'm seeing in the stderr files
>> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) the
>> >> following:
>> >>
>> >> Exception in thread "main"
>> >> org.apache.samza.system.SystemConsumersException: Cannot deserialize an
>> >> incoming message.
>> >>     at
>> >>
>>
>> >>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:293)
>> >>     at org.apache.samza.system.SystemConsumers.org
>> >> $apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
>> >>     at
>> >>
>>
>> >>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemCo
>> >>nsumers.scala:276)
>> >>     at
>> >>
>>
>> >>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemCo
>> >>nsumers.scala:276)
>> >>     at
>> >>
>>
>> >>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>> >>la:244)
>> >>     at
>> >>
>>
>> >>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>> >>la:244)
>> >>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>     at
>> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>> >>     at
>> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> >>     at
>> >>
>>
>> >>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scal
>> >>a:47)
>> >>     at scala.collection.SetLike$class.map(SetLike.scala:93)
>> >>     at scala.collection.AbstractSet.map(Set.scala:47)
>> >>     at
>> >>
>>
>> >>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:276
>> >>)
>> >>     at
>> >>
>>
>> >>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:213)
>> >>     at
>> >>
>>
>> >>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(Ru
>> >>nLoop.scala:81)
>> >>     at
>> >>
>>
>> >>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(Ru
>> >>nLoop.scala:81)
>> >>     at
>> >> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>> >>     at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>> >>     at
>> >>
>>
>> >>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala
>> >>:80)
>> >>     at
>> >> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>> >>     at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>> >>     at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
>> >>     at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>> >>     at
>> >> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
>> >>     at
>> >>
>>
>> >>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:
>> >>108)
>> >>     at
>> >>
>> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>> >>     at
>> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> >> Caused by: org.codehaus.jackson.JsonParseException: Unexpected
>> character
>> >> ('M' (code 77)): expected a valid value (number, String, array, object,
>> >> 'true', 'false' or 'null')
>> >>  at [Source: [B@5454d285; line: 1, column: 2]
>> >>     at
>> >> org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
>> >>     at
>> >>
>>
>> >>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMi
>> >>nimalBase.java:385)
>> >>     at
>> >>
>>
>> >>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(Jso
>> >>nParserMinimalBase.java:306)
>> >>     at
>> >>
>>
>> >>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8Str
>> >>eamParser.java:1581)
>> >>     at
>> >>
>>
>> >>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8Stre
>> >>amParser.java:436)
>> >>     at
>> >>
>>
>> >>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.jav
>> >>a:322)
>> >>     at
>> >>
>>
>> >>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java:2
>> >>432)
>> >>     at
>> >>
>>
>> >>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:
>> >>2389)
>> >>     at
>> >> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
>> >>     at
>> >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
>> >>     at
>> >>
>>
>> >>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:11
>> >>5)
>> >>     at
>> >>
>>
>> >>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:290)
>> >>
>> >>
>> >> I changed the systems.kafka.samza.msg.serde=json to 'string' a while
>> >>back,
>> >> but that caused a separate exception.  However that was many, MANY
>> >>attempts
>> >> ago.
>> >>
>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <
>> ash.mathe...@gmail.com>
>> >> wrote:
>> >>
>> >>> Ahh, I was going to add it to the run-class.sh script.
>> >>>
>> >>> Yeah, it's already there by default:
>> >>>
>> >>>
>> >>> # Metrics
>> >>> metrics.reporters=snapshot,jmx
>> >>>
>> >>>
>>
>> >>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metric
>> >>>sSnapshotReporterFactory
>> >>> metrics.reporter.snapshot.stream=kafka.metrics
>> >>>
>> >>>
>>
>> >>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporter
>> >>>Factory
>> >>>
>> >>> So, where would I see those metrics?
>> >>>
>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson
>> >>><ash.mathe...@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> read: I'm a C++ programmer looking at Java for the first time in > 10
>> >>>> years
>> >>>>
>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson
>> >>>><ash.mathe...@gmail.com>
>> >>>> wrote:
>> >>>>
>> >>>>> I'm assuming I have Jmx defined ... where would that get set?
>> >>>>>
>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman <
>> >>>>> chinmay.cere...@gmail.com> wrote:
>> >>>>>
>> >>>>>> Hey Ash,
>> >>>>>>
>> >>>>>> Can you see your job metrics (if you have the Jmx metrics defined)
>> >>>>>>to
>> >>>>>> see
>> >>>>>> if your job is actually doing anything ? My only guess at this
>> point
>> >>>>>> is the
>> >>>>>> process method is not being called because somehow there's no
>> >>>>>>incoming
>> >>>>>> data. I could be totally wrong of course.
>> >>>>>>
>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <
>> >>>>>> ash.mathe...@gmail.com>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>> > Just to be clear, here's what's changed from the default
>> >>>>>>hello-samza
>> >>>>>> repo:
>> >>>>>> >
>> >>>>>> > wikipedia-parser.properties==========================
>> >>>>>> > task.inputs=kafka.myTopic
>> >>>>>> > systems.kafka.consumer.zookeeper.connect=
>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest
>> >>>>>> >
>> >>>>>> > WikipediaParserStreamTask.java =====================
>> >>>>>> >   public void process(IncomingMessageEnvelope envelope,
>> >>>>>> MessageCollector
>> >>>>>> > collector, TaskCoordinator coordinator) {
>> >>>>>> >     Map<String, Object> jsonObject = (Map<String, Object>)
>> >>>>>> > envelope.getMessage();
>> >>>>>> >     WikipediaFeedEvent event = new
>> WikipediaFeedEvent(jsonObject);
>> >>>>>> >
>> >>>>>> >     try {
>> >>>>>> >       System.out.println(event.getRawEvent());
>> >>>>>> >       // Map<String, Object> parsedJsonObject =
>> >>>>>> parse(event.getRawEvent());
>> >>>>>> >
>> >>>>>> >       // parsedJsonObject.put("channel", event.getChannel());
>> >>>>>> >       // parsedJsonObject.put("source", event.getSource());
>> >>>>>> >       // parsedJsonObject.put("time", event.getTime());
>> >>>>>> >
>> >>>>>> >       // collector.send(new OutgoingMessageEnvelope(new
>> >>>>>> > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
>> >>>>>> >
>> >>>>>> > as well as the aforementioned changes to the log4j.xml file.
>> >>>>>> >
>> >>>>>> > The data pushed into the 'myTopic' topic is nothing more than a
>> >>>>>> sentence.
>> >>>>>> >
>> >>>>>> >
>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <
>> >>>>>> ash.mathe...@gmail.com>
>> >>>>>> > wrote:
>> >>>>>> >
>> >>>>>> > > yep, modified log4j.xml to look like this:
>> >>>>>> > >
>> >>>>>> > >   <root>
>> >>>>>> > >     <priority value="debug" />
>> >>>>>> > >     <appender-ref ref="RollingAppender"/>
>> >>>>>> > >     <appender-ref ref="jmx" />
>> >>>>>> > >   </root>
>> >>>>>> > >
>> >>>>>> > > Not sure what you mean by #2.
>> >>>>>> > >
>> >>>>>> > > However, I'm running now, not seeing any exceptions, but still
>> >>>>>>not
>> >>>>>> seeing
>> >>>>>> > > any output from System.out.println(...)
>> >>>>>> > >
>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram <
>> >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote:
>> >>>>>> > >
>> >>>>>> > >> Hey Ash,
>> >>>>>> > >>                1. Did you happen to modify your log4j.xml ?
>> >>>>>> > >>                2. Can you print the class path that was
>> printed
>> >>>>>> when the
>> >>>>>> > >> job started ? I am wondering if log4j was not loaded or not
>> >>>>>> present in
>> >>>>>> > the
>> >>>>>> > >> path where it¹s looking for. If you have been using hello
>> >>>>>>samza,
>> >>>>>> it
>> >>>>>> > should
>> >>>>>> > >> have pulled it from Maven.
>> >>>>>> > >>
>> >>>>>> > >> Thanks,
>> >>>>>> > >> Naveen
>> >>>>>> > >>
>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <
>> >>>>>> ash.mathe...@gmail.com>
>> >>>>>> > >> wrote:
>> >>>>>> > >>
>> >>>>>> > >> > Hey all,
>> >>>>>> > >> >
>> >>>>>> > >> > Evaluating Samza currently and am running into some odd
>> >>>>>>issues.
>> >>>>>> > >> >
>> >>>>>> > >> > I'm currently working off the 'hello-samza' repo and trying
>> >>>>>>to
>> >>>>>> parse a
>> >>>>>> > >> > simple kafka topic that I've produced through an extenal
>> java
>> >>>>>> app
>> >>>>>> > >> (nothing
>> >>>>>> > >> > other than a series of sentences) and it's failing pretty
>> >>>>>>hard
>> >>>>>> for me.
>> >>>>>> > >> The
>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but as soon as I
>> >>>>>> change the
>> >>>>>> > >> > configuration to look at a different Kafka/zookeeper I get
>> >>>>>>the
>> >>>>>> > >> following in
>> >>>>>> > >> > the userlogs:
>> >>>>>> > >> >
>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch
>> >>>>>>last
>> >>>>>> > offsets
>> >>>>>> > >> > for streams [myTopic] due to kafka.common.KafkaException:
>> >>>>>> fetching
>> >>>>>> > topic
>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker
>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed.
>> Retrying.
>> >>>>>> > >> >
>> >>>>>> > >> >
>> >>>>>> > >> > The modifications are pretty straightforward.  In the
>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the following:
>> >>>>>> > >> > task.inputs=kafka.myTopic
>> >>>>>> > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
>> >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest
>> >>>>>> > >> > systems.kafka.producer.metadata.broker.list=redacted:9092
>> >>>>>> > >> >
>> >>>>>> > >> > and in the actual java file WikipediaParserStreamTask.java
>> >>>>>> > >> >  public void process(IncomingMessageEnvelope envelope,
>> >>>>>> > MessageCollector
>> >>>>>> > >> > collector, TaskCoordinator coordinator) {
>> >>>>>> > >> >    Map<String, Object> jsonObject = (Map<String, Object>)
>> >>>>>> > >> > envelope.getMessage();
>> >>>>>> > >> >    WikipediaFeedEvent event = new
>> >>>>>> WikipediaFeedEvent(jsonObject);
>> >>>>>> > >> >
>> >>>>>> > >> >    try {
>> >>>>>> > >> >        System.out.println(event.getRawEvent());
>> >>>>>> > >> >
>> >>>>>> > >> > And then following the compile/extract/run process outlined
>> >>>>>>in
>> >>>>>> the
>> >>>>>> > >> > hello-samza website.
>> >>>>>> > >> >
>> >>>>>> > >> > Any thoughts?  I've looked online for any 'super simple'
>> >>>>>> examples of
>> >>>>>> > >> > ingesting kafka in samza with very little success.
>> >>>>>> > >>
>> >>>>>> > >>
>> >>>>>> > >
>> >>>>>> >
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> --
>> >>>>>> Thanks and regards
>> >>>>>>
>> >>>>>> Chinmay Soman
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>>
>>

Reply via email to