Hi Ash,
I just ran wikipedia-parser with your patch. Looks like you have set the
message serde correctly in the configs. However, the original code still
converts it into a Map for consumption in the WikipediaFeedEvent.
I am seeing the following (expected):

2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] Uncaught
exception in thread (name=main). Exiting process now.
java.lang.ClassCastException: java.lang.String cannot be cast to
java.util.Map
 at 
samza.examples.wikipedia.task.WikipediaParserStreamTask.process(WikipediaPa
rserStreamTask.java:38)
 at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(Tas
kInstance.scala:133)

Did you make the changes to fix this error? Your patch doesn¹t seem to
have that. 
Line 38 Map<String, Object> jsonObject = (Map<String, Object>)
envelope.getMessage();



Lmk so I can investigate further.

Cheers!
Navina

On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote:

>If anyone's interested, I've posted a diff of the project here:
>http://pastebin.com/6ZW6Y1Vu
>and the python publisher here: http://pastebin.com/2NvTFDFx
>
>if you want to take a stab at it.
>
>On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson <ash.mathe...@gmail.com>
>wrote:
>
>> Ok, so very simple test, all running on a local machine, not across
>> networks and all in the hello-samza repo this time around.
>>
>> I've got the datapusher.py file set up to push data into localhost. One
>> event per second.
>> And a modified hello-samza where I've modified the
>> WikipediaParserStreamTask.java class to simply read what's there.
>>
>> Running them both now and I'm seeing in the stderr files
>> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) the
>> following:
>>
>> Exception in thread "main"
>> org.apache.samza.system.SystemConsumersException: Cannot deserialize an
>> incoming message.
>>     at
>> 
>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:293)
>>     at org.apache.samza.system.SystemConsumers.org
>> $apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
>>     at
>> 
>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemCo
>>nsumers.scala:276)
>>     at
>> 
>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemCo
>>nsumers.scala:276)
>>     at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>>     at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>     at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>>     at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>     at
>> 
>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scal
>>a:47)
>>     at scala.collection.SetLike$class.map(SetLike.scala:93)
>>     at scala.collection.AbstractSet.map(Set.scala:47)
>>     at
>> 
>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:276
>>)
>>     at
>> 
>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:213)
>>     at
>> 
>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(Ru
>>nLoop.scala:81)
>>     at
>> 
>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(Ru
>>nLoop.scala:81)
>>     at
>> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>>     at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>>     at
>> 
>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala
>>:80)
>>     at
>> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>>     at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>>     at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
>>     at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>>     at
>> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
>>     at
>> 
>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:
>>108)
>>     at
>> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>>     at 
>>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> Caused by: org.codehaus.jackson.JsonParseException: Unexpected character
>> ('M' (code 77)): expected a valid value (number, String, array, object,
>> 'true', 'false' or 'null')
>>  at [Source: [B@5454d285; line: 1, column: 2]
>>     at
>> org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
>>     at
>> 
>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMi
>>nimalBase.java:385)
>>     at
>> 
>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(Jso
>>nParserMinimalBase.java:306)
>>     at
>> 
>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8Str
>>eamParser.java:1581)
>>     at
>> 
>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8Stre
>>amParser.java:436)
>>     at
>> 
>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.jav
>>a:322)
>>     at
>> 
>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java:2
>>432)
>>     at
>> 
>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:
>>2389)
>>     at
>> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
>>     at 
>>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
>>     at
>> 
>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:11
>>5)
>>     at
>> 
>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:290)
>>
>>
>> I changed the systems.kafka.samza.msg.serde=json to 'string' a while
>>back,
>> but that caused a separate exception.  However that was many, MANY
>>attempts
>> ago.
>>
>> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <ash.mathe...@gmail.com>
>> wrote:
>>
>>> Ahh, I was going to add it to the run-class.sh script.
>>>
>>> Yeah, it's already there by default:
>>>
>>>
>>> # Metrics
>>> metrics.reporters=snapshot,jmx
>>>
>>> 
>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metric
>>>sSnapshotReporterFactory
>>> metrics.reporter.snapshot.stream=kafka.metrics
>>>
>>> 
>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporter
>>>Factory
>>>
>>> So, where would I see those metrics?
>>>
>>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson
>>><ash.mathe...@gmail.com>
>>> wrote:
>>>
>>>> read: I'm a C++ programmer looking at Java for the first time in > 10
>>>> years
>>>>
>>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson
>>>><ash.mathe...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm assuming I have Jmx defined ... where would that get set?
>>>>>
>>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman <
>>>>> chinmay.cere...@gmail.com> wrote:
>>>>>
>>>>>> Hey Ash,
>>>>>>
>>>>>> Can you see your job metrics (if you have the Jmx metrics defined)
>>>>>>to
>>>>>> see
>>>>>> if your job is actually doing anything ? My only guess at this point
>>>>>> is the
>>>>>> process method is not being called because somehow there's no
>>>>>>incoming
>>>>>> data. I could be totally wrong of course.
>>>>>>
>>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <
>>>>>> ash.mathe...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Just to be clear, here's what's changed from the default
>>>>>>hello-samza
>>>>>> repo:
>>>>>> >
>>>>>> > wikipedia-parser.properties==========================
>>>>>> > task.inputs=kafka.myTopic
>>>>>> > systems.kafka.consumer.zookeeper.connect=
>>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
>>>>>> > systems.kafka.consumer.auto.offset.reset=smallest
>>>>>> >
>>>>>> > WikipediaParserStreamTask.java =====================
>>>>>> >   public void process(IncomingMessageEnvelope envelope,
>>>>>> MessageCollector
>>>>>> > collector, TaskCoordinator coordinator) {
>>>>>> >     Map<String, Object> jsonObject = (Map<String, Object>)
>>>>>> > envelope.getMessage();
>>>>>> >     WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
>>>>>> >
>>>>>> >     try {
>>>>>> >       System.out.println(event.getRawEvent());
>>>>>> >       // Map<String, Object> parsedJsonObject =
>>>>>> parse(event.getRawEvent());
>>>>>> >
>>>>>> >       // parsedJsonObject.put("channel", event.getChannel());
>>>>>> >       // parsedJsonObject.put("source", event.getSource());
>>>>>> >       // parsedJsonObject.put("time", event.getTime());
>>>>>> >
>>>>>> >       // collector.send(new OutgoingMessageEnvelope(new
>>>>>> > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
>>>>>> >
>>>>>> > as well as the aforementioned changes to the log4j.xml file.
>>>>>> >
>>>>>> > The data pushed into the 'myTopic' topic is nothing more than a
>>>>>> sentence.
>>>>>> >
>>>>>> >
>>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <
>>>>>> ash.mathe...@gmail.com>
>>>>>> > wrote:
>>>>>> >
>>>>>> > > yep, modified log4j.xml to look like this:
>>>>>> > >
>>>>>> > >   <root>
>>>>>> > >     <priority value="debug" />
>>>>>> > >     <appender-ref ref="RollingAppender"/>
>>>>>> > >     <appender-ref ref="jmx" />
>>>>>> > >   </root>
>>>>>> > >
>>>>>> > > Not sure what you mean by #2.
>>>>>> > >
>>>>>> > > However, I'm running now, not seeing any exceptions, but still
>>>>>>not
>>>>>> seeing
>>>>>> > > any output from System.out.println(...)
>>>>>> > >
>>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram <
>>>>>> > > nsomasunda...@linkedin.com.invalid> wrote:
>>>>>> > >
>>>>>> > >> Hey Ash,
>>>>>> > >>                1. Did you happen to modify your log4j.xml ?
>>>>>> > >>                2. Can you print the class path that was printed
>>>>>> when the
>>>>>> > >> job started ? I am wondering if log4j was not loaded or not
>>>>>> present in
>>>>>> > the
>>>>>> > >> path where it¹s looking for. If you have been using hello
>>>>>>samza,
>>>>>> it
>>>>>> > should
>>>>>> > >> have pulled it from Maven.
>>>>>> > >>
>>>>>> > >> Thanks,
>>>>>> > >> Naveen
>>>>>> > >>
>>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <
>>>>>> ash.mathe...@gmail.com>
>>>>>> > >> wrote:
>>>>>> > >>
>>>>>> > >> > Hey all,
>>>>>> > >> >
>>>>>> > >> > Evaluating Samza currently and am running into some odd
>>>>>>issues.
>>>>>> > >> >
>>>>>> > >> > I'm currently working off the 'hello-samza' repo and trying
>>>>>>to
>>>>>> parse a
>>>>>> > >> > simple kafka topic that I've produced through an extenal java
>>>>>> app
>>>>>> > >> (nothing
>>>>>> > >> > other than a series of sentences) and it's failing pretty
>>>>>>hard
>>>>>> for me.
>>>>>> > >> The
>>>>>> > >> > base 'hello-samza' set of apps works fine, but as soon as I
>>>>>> change the
>>>>>> > >> > configuration to look at a different Kafka/zookeeper I get
>>>>>>the
>>>>>> > >> following in
>>>>>> > >> > the userlogs:
>>>>>> > >> >
>>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch
>>>>>>last
>>>>>> > offsets
>>>>>> > >> > for streams [myTopic] due to kafka.common.KafkaException:
>>>>>> fetching
>>>>>> > topic
>>>>>> > >> > metadata for topics [Set(myTopic)] from broker
>>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying.
>>>>>> > >> >
>>>>>> > >> >
>>>>>> > >> > The modifications are pretty straightforward.  In the
>>>>>> > >> > Wikipedia-parser.properties, I've changed the following:
>>>>>> > >> > task.inputs=kafka.myTopic
>>>>>> > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
>>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest
>>>>>> > >> > systems.kafka.producer.metadata.broker.list=redacted:9092
>>>>>> > >> >
>>>>>> > >> > and in the actual java file WikipediaParserStreamTask.java
>>>>>> > >> >  public void process(IncomingMessageEnvelope envelope,
>>>>>> > MessageCollector
>>>>>> > >> > collector, TaskCoordinator coordinator) {
>>>>>> > >> >    Map<String, Object> jsonObject = (Map<String, Object>)
>>>>>> > >> > envelope.getMessage();
>>>>>> > >> >    WikipediaFeedEvent event = new
>>>>>> WikipediaFeedEvent(jsonObject);
>>>>>> > >> >
>>>>>> > >> >    try {
>>>>>> > >> >        System.out.println(event.getRawEvent());
>>>>>> > >> >
>>>>>> > >> > And then following the compile/extract/run process outlined
>>>>>>in
>>>>>> the
>>>>>> > >> > hello-samza website.
>>>>>> > >> >
>>>>>> > >> > Any thoughts?  I've looked online for any 'super simple'
>>>>>> examples of
>>>>>> > >> > ingesting kafka in samza with very little success.
>>>>>> > >>
>>>>>> > >>
>>>>>> > >
>>>>>> >
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks and regards
>>>>>>
>>>>>> Chinmay Soman
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to