If anyone's interested, I've posted a diff of the project here: http://pastebin.com/6ZW6Y1Vu and the python publisher here: http://pastebin.com/2NvTFDFx
if you want to take a stab at it. On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson <ash.mathe...@gmail.com> wrote: > Ok, so very simple test, all running on a local machine, not across > networks and all in the hello-samza repo this time around. > > I've got the datapusher.py file set up to push data into localhost. One > event per second. > And a modified hello-samza where I've modified the > WikipediaParserStreamTask.java class to simply read what's there. > > Running them both now and I'm seeing in the stderr files > (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) the > following: > > Exception in thread "main" > org.apache.samza.system.SystemConsumersException: Cannot deserialize an > incoming message. > at > org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:293) > at org.apache.samza.system.SystemConsumers.org > $apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260) > at > org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276) > at > org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) > at scala.collection.SetLike$class.map(SetLike.scala:93) > at scala.collection.AbstractSet.map(Set.scala:47) > at > org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:276) > at > org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:213) > at > org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(RunLoop.scala:81) > at > org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(RunLoop.scala:81) > at > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > at > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:80) > at > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > at org.apache.samza.container.RunLoop.process(RunLoop.scala:79) > at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) > at > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) > at > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > Caused by: org.codehaus.jackson.JsonParseException: Unexpected character > ('M' (code 77)): expected a valid value (number, String, array, object, > 'true', 'false' or 'null') > at [Source: [B@5454d285; line: 1, column: 2] > at > org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) > at > org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385) > at > org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:306) > at > org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8StreamParser.java:1581) > at > org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8StreamParser.java:436) > at > org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:322) > at > org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java:2432) > at > org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2389) > at > org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) > at org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) > at > org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:115) > at > org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:290) > > > I changed the systems.kafka.samza.msg.serde=json to 'string' a while back, > but that caused a separate exception. However that was many, MANY attempts > ago. > > On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <ash.mathe...@gmail.com> > wrote: > >> Ahh, I was going to add it to the run-class.sh script. >> >> Yeah, it's already there by default: >> >> >> # Metrics >> metrics.reporters=snapshot,jmx >> >> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory >> metrics.reporter.snapshot.stream=kafka.metrics >> >> metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory >> >> So, where would I see those metrics? >> >> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson <ash.mathe...@gmail.com> >> wrote: >> >>> read: I'm a C++ programmer looking at Java for the first time in > 10 >>> years >>> >>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson <ash.mathe...@gmail.com> >>> wrote: >>> >>>> I'm assuming I have Jmx defined ... where would that get set? >>>> >>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman < >>>> chinmay.cere...@gmail.com> wrote: >>>> >>>>> Hey Ash, >>>>> >>>>> Can you see your job metrics (if you have the Jmx metrics defined) to >>>>> see >>>>> if your job is actually doing anything ? My only guess at this point >>>>> is the >>>>> process method is not being called because somehow there's no incoming >>>>> data. I could be totally wrong of course. >>>>> >>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson < >>>>> ash.mathe...@gmail.com> >>>>> wrote: >>>>> >>>>> > Just to be clear, here's what's changed from the default hello-samza >>>>> repo: >>>>> > >>>>> > wikipedia-parser.properties========================== >>>>> > task.inputs=kafka.myTopic >>>>> > systems.kafka.consumer.zookeeper.connect= >>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ >>>>> > systems.kafka.consumer.auto.offset.reset=smallest >>>>> > >>>>> > WikipediaParserStreamTask.java ===================== >>>>> > public void process(IncomingMessageEnvelope envelope, >>>>> MessageCollector >>>>> > collector, TaskCoordinator coordinator) { >>>>> > Map<String, Object> jsonObject = (Map<String, Object>) >>>>> > envelope.getMessage(); >>>>> > WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); >>>>> > >>>>> > try { >>>>> > System.out.println(event.getRawEvent()); >>>>> > // Map<String, Object> parsedJsonObject = >>>>> parse(event.getRawEvent()); >>>>> > >>>>> > // parsedJsonObject.put("channel", event.getChannel()); >>>>> > // parsedJsonObject.put("source", event.getSource()); >>>>> > // parsedJsonObject.put("time", event.getTime()); >>>>> > >>>>> > // collector.send(new OutgoingMessageEnvelope(new >>>>> > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject)); >>>>> > >>>>> > as well as the aforementioned changes to the log4j.xml file. >>>>> > >>>>> > The data pushed into the 'myTopic' topic is nothing more than a >>>>> sentence. >>>>> > >>>>> > >>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson < >>>>> ash.mathe...@gmail.com> >>>>> > wrote: >>>>> > >>>>> > > yep, modified log4j.xml to look like this: >>>>> > > >>>>> > > <root> >>>>> > > <priority value="debug" /> >>>>> > > <appender-ref ref="RollingAppender"/> >>>>> > > <appender-ref ref="jmx" /> >>>>> > > </root> >>>>> > > >>>>> > > Not sure what you mean by #2. >>>>> > > >>>>> > > However, I'm running now, not seeing any exceptions, but still not >>>>> seeing >>>>> > > any output from System.out.println(...) >>>>> > > >>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram < >>>>> > > nsomasunda...@linkedin.com.invalid> wrote: >>>>> > > >>>>> > >> Hey Ash, >>>>> > >> 1. Did you happen to modify your log4j.xml ? >>>>> > >> 2. Can you print the class path that was printed >>>>> when the >>>>> > >> job started ? I am wondering if log4j was not loaded or not >>>>> present in >>>>> > the >>>>> > >> path where it’s looking for. If you have been using hello samza, >>>>> it >>>>> > should >>>>> > >> have pulled it from Maven. >>>>> > >> >>>>> > >> Thanks, >>>>> > >> Naveen >>>>> > >> >>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson < >>>>> ash.mathe...@gmail.com> >>>>> > >> wrote: >>>>> > >> >>>>> > >> > Hey all, >>>>> > >> > >>>>> > >> > Evaluating Samza currently and am running into some odd issues. >>>>> > >> > >>>>> > >> > I'm currently working off the 'hello-samza' repo and trying to >>>>> parse a >>>>> > >> > simple kafka topic that I've produced through an extenal java >>>>> app >>>>> > >> (nothing >>>>> > >> > other than a series of sentences) and it's failing pretty hard >>>>> for me. >>>>> > >> The >>>>> > >> > base 'hello-samza' set of apps works fine, but as soon as I >>>>> change the >>>>> > >> > configuration to look at a different Kafka/zookeeper I get the >>>>> > >> following in >>>>> > >> > the userlogs: >>>>> > >> > >>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch last >>>>> > offsets >>>>> > >> > for streams [myTopic] due to kafka.common.KafkaException: >>>>> fetching >>>>> > topic >>>>> > >> > metadata for topics [Set(myTopic)] from broker >>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying. >>>>> > >> > >>>>> > >> > >>>>> > >> > The modifications are pretty straightforward. In the >>>>> > >> > Wikipedia-parser.properties, I've changed the following: >>>>> > >> > task.inputs=kafka.myTopic >>>>> > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/ >>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest >>>>> > >> > systems.kafka.producer.metadata.broker.list=redacted:9092 >>>>> > >> > >>>>> > >> > and in the actual java file WikipediaParserStreamTask.java >>>>> > >> > public void process(IncomingMessageEnvelope envelope, >>>>> > MessageCollector >>>>> > >> > collector, TaskCoordinator coordinator) { >>>>> > >> > Map<String, Object> jsonObject = (Map<String, Object>) >>>>> > >> > envelope.getMessage(); >>>>> > >> > WikipediaFeedEvent event = new >>>>> WikipediaFeedEvent(jsonObject); >>>>> > >> > >>>>> > >> > try { >>>>> > >> > System.out.println(event.getRawEvent()); >>>>> > >> > >>>>> > >> > And then following the compile/extract/run process outlined in >>>>> the >>>>> > >> > hello-samza website. >>>>> > >> > >>>>> > >> > Any thoughts? I've looked online for any 'super simple' >>>>> examples of >>>>> > >> > ingesting kafka in samza with very little success. >>>>> > >> >>>>> > >> >>>>> > > >>>>> > >>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks and regards >>>>> >>>>> Chinmay Soman >>>>> >>>> >>>> >>> >> >