>I changed the systems.kafka.samza.msg.serde=json to 'string' a while back, but that caused a separate exception. However that was many, MANY attempts ago.
This may not work because that will set all serialization formats (input and output) to json / string. In your case you're inputting string and outputting json. So you might have to set that explicitly. On Mon, Mar 23, 2015 at 9:24 PM, Chinmay Soman <chinmay.cere...@gmail.com> wrote: > Since you're producing String data to 'myTopic', can you try setting the > string serialization in your config ? > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > systems.kafka.streams.myTopic.samza.msg.serde=string > > > On Mon, Mar 23, 2015 at 9:17 PM, Ash W Matheson <ash.mathe...@gmail.com> > wrote: > >> more info - new exception message: >> >> Exception in thread "main" >> org.apache.samza.system.SystemConsumersException: Cannot deserialize an >> incoming message. >> >> Updated the diff in pastebin with the changes. >> >> On Mon, Mar 23, 2015 at 8:41 PM, Ash W Matheson <ash.mathe...@gmail.com> >> wrote: >> >> > Gah! Yeah, those were gone several revisions ago but didn't get nuked >> in >> > the last iteration. >> > >> > OK, let me do a quick test to see if that was my problem all along. >> > >> > On Mon, Mar 23, 2015 at 8:38 PM, Navina Ramesh < >> > nram...@linkedin.com.invalid> wrote: >> > >> >> Hey Ash, >> >> I was referring to the lines before the try block. >> >> >> >> Map<String, Object> jsonObject = (Map<String, Object>) >> >> envelope.getMessage(); >> >> WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); >> >> >> >> try { >> >> System.out.println("[DWH] should see this"); >> >> System.out.println(event.getRawEvent()); >> >> … >> >> >> >> >> >> Did you remove those lines as well? >> >> >> >> Navina >> >> >> >> On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote: >> >> >> >> >Just looking at the diff I posted and it's: >> >> > >> >> > >> >> > 1. try { >> >> > 2. - Map<String, Object> parsedJsonObject = >> >> >parse(event.getRawEvent( >> >> > )); >> >> > 3. + System.out.println("[DWH] should see this"); >> >> > 4. + System.out.println(event.getRawEvent()); >> >> > 5. + // Map<String, Object> parsedJsonObject = parse( >> >> > event.getRawEvent()); >> >> > >> >> > >> >> >I've removed the Map and added two System.out.println calls. So no, >> >> there >> >> >shouldn't be any reference to >> >> >Map<String, Object> parsedJsonObject = parse(event.getRawEvent()); >> >> >in the source java file. >> >> > >> >> > >> >> >On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson < >> ash.mathe...@gmail.com> >> >> >wrote: >> >> > >> >> >> I'm in transit right now but if memory serves me everything should >> be >> >> >> commented out of that method except for the System.out.println call. >> >> >>I'll >> >> >> be home shortly and can confirm. >> >> >> On Mar 23, 2015 7:28 PM, "Navina Ramesh" >> <nram...@linkedin.com.invalid >> >> > >> >> >> wrote: >> >> >> >> >> >>> Hi Ash, >> >> >>> I just ran wikipedia-parser with your patch. Looks like you have >> set >> >> >>>the >> >> >>> message serde correctly in the configs. However, the original code >> >> >>>still >> >> >>> converts it into a Map for consumption in the WikipediaFeedEvent. >> >> >>> I am seeing the following (expected): >> >> >>> >> >> >>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] Uncaught >> >> >>> exception in thread (name=main). Exiting process now. >> >> >>> java.lang.ClassCastException: java.lang.String cannot be cast to >> >> >>> java.util.Map >> >> >>> at >> >> >>> >> >> >>> >> >> >> >> >> >>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi >> >> >>>aPa >> >> >>> rserStreamTask.java:38) >> >> >>> at >> >> >>> >> >> >>> >> >> >> >> >> >>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp( >> >> >>>Tas >> >> >>> kInstance.scala:133) >> >> >>> >> >> >>> Did you make the changes to fix this error? Your patch doesn¹t >> seem to >> >> >>> have that. >> >> >>> Line 38 Map<String, Object> jsonObject = (Map<String, Object>) >> >> >>> envelope.getMessage(); >> >> >>> >> >> >>> >> >> >>> >> >> >>> Lmk so I can investigate further. >> >> >>> >> >> >>> Cheers! >> >> >>> Navina >> >> >>> >> >> >>> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com> >> wrote: >> >> >>> >> >> >>> >If anyone's interested, I've posted a diff of the project here: >> >> >>> >http://pastebin.com/6ZW6Y1Vu >> >> >>> >and the python publisher here: http://pastebin.com/2NvTFDFx >> >> >>> > >> >> >>> >if you want to take a stab at it. >> >> >>> > >> >> >>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson >> >> >>><ash.mathe...@gmail.com> >> >> >>> >wrote: >> >> >>> > >> >> >>> >> Ok, so very simple test, all running on a local machine, not >> across >> >> >>> >> networks and all in the hello-samza repo this time around. >> >> >>> >> >> >> >>> >> I've got the datapusher.py file set up to push data into >> localhost. >> >> >>>One >> >> >>> >> event per second. >> >> >>> >> And a modified hello-samza where I've modified the >> >> >>> >> WikipediaParserStreamTask.java class to simply read what's >> there. >> >> >>> >> >> >> >>> >> Running them both now and I'm seeing in the stderr files >> >> >>> >> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) >> >> >>>the >> >> >>> >> following: >> >> >>> >> >> >> >>> >> Exception in thread "main" >> >> >>> >> org.apache.samza.system.SystemConsumersException: Cannot >> >> >>>deserialize an >> >> >>> >> incoming message. >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2 >> >> >>>>>93) >> >> >>> >> at org.apache.samza.system.SystemConsumers.org >> >> >>> >> >> >> >> >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste >> >> >>>>>mCo >> >> >>> >>nsumers.scala:276) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste >> >> >>>>>mCo >> >> >>> >>nsumers.scala:276) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. >> >> >>>>>sca >> >> >>> >>la:244) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. >> >> >>>>>sca >> >> >>> >>la:244) >> >> >>> >> at >> scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> >>> >> at >> >> >>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> >>> >> at >> >> >>> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) >> >> >>> >> at >> >> >>> >> >> >> >> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s >> >> >>>>>cal >> >> >>> >>a:47) >> >> >>> >> at scala.collection.SetLike$class.map(SetLike.scala:93) >> >> >>> >> at scala.collection.AbstractSet.map(Set.scala:47) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala: >> >> >>>>>276 >> >> >>> >>) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2 >> >> >>>>>13) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply >> >> >>>>>(Ru >> >> >>> >>nLoop.scala:81) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply >> >> >>>>>(Ru >> >> >>> >>nLoop.scala:81) >> >> >>> >> at >> >> >>> >> >> >> >> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >> >> >>> >> at >> >> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc >> >> >>>>>ala >> >> >>> >>:80) >> >> >>> >> at >> >> >>> >> >> >> >> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >> >> >>> >> at >> >> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) >> >> >>> >> at >> org.apache.samza.container.RunLoop.process(RunLoop.scala:79) >> >> >>> >> at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) >> >> >>> >> at >> >> >>> >> >> >> >> >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca >> >> >>>>>la: >> >> >>> >>108) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >> >> >> >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) >> >> >>> >> at >> >> >>> >> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >> >> >>> >> Caused by: org.codehaus.jackson.JsonParseException: Unexpected >> >> >>> character >> >> >>> >> ('M' (code 77)): expected a valid value (number, String, array, >> >> >>>object, >> >> >>> >> 'true', 'false' or 'null') >> >> >>> >> at [Source: [B@5454d285; line: 1, column: 2] >> >> >>> >> at >> >> >>> >> >> >> >> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse >> >> >>>>>rMi >> >> >>> >>nimalBase.java:385) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar( >> >> >>>>>Jso >> >> >>> >>nParserMinimalBase.java:306) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8 >> >> >>>>>Str >> >> >>> >>eamParser.java:1581) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S >> >> >>>>>tre >> >> >>> >>amParser.java:436) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser. >> >> >>>>>jav >> >> >>> >>a:322) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav >> >> >>>>>a:2 >> >> >>> >>432) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja >> >> >>>>>va: >> >> >>> >>2389) >> >> >>> >> at >> >> >>> >> >> >> >> >>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) >> >> >>> >> at >> >> >>> >> >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala >> >> >>>>>:11 >> >> >>> >>5) >> >> >>> >> at >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2 >> >> >>>>>90) >> >> >>> >> >> >> >>> >> >> >> >>> >> I changed the systems.kafka.samza.msg.serde=json to 'string' a >> >> while >> >> >>> >>back, >> >> >>> >> but that caused a separate exception. However that was many, >> MANY >> >> >>> >>attempts >> >> >>> >> ago. >> >> >>> >> >> >> >>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson < >> >> >>> ash.mathe...@gmail.com> >> >> >>> >> wrote: >> >> >>> >> >> >> >>> >>> Ahh, I was going to add it to the run-class.sh script. >> >> >>> >>> >> >> >>> >>> Yeah, it's already there by default: >> >> >>> >>> >> >> >>> >>> >> >> >>> >>> # Metrics >> >> >>> >>> metrics.reporters=snapshot,jmx >> >> >>> >>> >> >> >>> >>> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met >> >> >>>>>>ric >> >> >>> >>>sSnapshotReporterFactory >> >> >>> >>> metrics.reporter.snapshot.stream=kafka.metrics >> >> >>> >>> >> >> >>> >>> >> >> >>> >> >> >>> >> >> >> >> >> >>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor >> >> >>>>>>ter >> >> >>> >>>Factory >> >> >>> >>> >> >> >>> >>> So, where would I see those metrics? >> >> >>> >>> >> >> >>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson >> >> >>> >>><ash.mathe...@gmail.com> >> >> >>> >>> wrote: >> >> >>> >>> >> >> >>> >>>> read: I'm a C++ programmer looking at Java for the first time >> in >> >> >>>> 10 >> >> >>> >>>> years >> >> >>> >>>> >> >> >>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson >> >> >>> >>>><ash.mathe...@gmail.com> >> >> >>> >>>> wrote: >> >> >>> >>>> >> >> >>> >>>>> I'm assuming I have Jmx defined ... where would that get set? >> >> >>> >>>>> >> >> >>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman < >> >> >>> >>>>> chinmay.cere...@gmail.com> wrote: >> >> >>> >>>>> >> >> >>> >>>>>> Hey Ash, >> >> >>> >>>>>> >> >> >>> >>>>>> Can you see your job metrics (if you have the Jmx metrics >> >> >>>defined) >> >> >>> >>>>>>to >> >> >>> >>>>>> see >> >> >>> >>>>>> if your job is actually doing anything ? My only guess at >> this >> >> >>> point >> >> >>> >>>>>> is the >> >> >>> >>>>>> process method is not being called because somehow there's >> no >> >> >>> >>>>>>incoming >> >> >>> >>>>>> data. I could be totally wrong of course. >> >> >>> >>>>>> >> >> >>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson < >> >> >>> >>>>>> ash.mathe...@gmail.com> >> >> >>> >>>>>> wrote: >> >> >>> >>>>>> >> >> >>> >>>>>> > Just to be clear, here's what's changed from the default >> >> >>> >>>>>>hello-samza >> >> >>> >>>>>> repo: >> >> >>> >>>>>> > >> >> >>> >>>>>> > wikipedia-parser.properties========================== >> >> >>> >>>>>> > task.inputs=kafka.myTopic >> >> >>> >>>>>> > systems.kafka.consumer.zookeeper.connect= >> >> >>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ >> >> >>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest >> >> >>> >>>>>> > >> >> >>> >>>>>> > WikipediaParserStreamTask.java ===================== >> >> >>> >>>>>> > public void process(IncomingMessageEnvelope envelope, >> >> >>> >>>>>> MessageCollector >> >> >>> >>>>>> > collector, TaskCoordinator coordinator) { >> >> >>> >>>>>> > Map<String, Object> jsonObject = (Map<String, Object>) >> >> >>> >>>>>> > envelope.getMessage(); >> >> >>> >>>>>> > WikipediaFeedEvent event = new >> >> >>> WikipediaFeedEvent(jsonObject); >> >> >>> >>>>>> > >> >> >>> >>>>>> > try { >> >> >>> >>>>>> > System.out.println(event.getRawEvent()); >> >> >>> >>>>>> > // Map<String, Object> parsedJsonObject = >> >> >>> >>>>>> parse(event.getRawEvent()); >> >> >>> >>>>>> > >> >> >>> >>>>>> > // parsedJsonObject.put("channel", >> event.getChannel()); >> >> >>> >>>>>> > // parsedJsonObject.put("source", >> event.getSource()); >> >> >>> >>>>>> > // parsedJsonObject.put("time", event.getTime()); >> >> >>> >>>>>> > >> >> >>> >>>>>> > // collector.send(new OutgoingMessageEnvelope(new >> >> >>> >>>>>> > SystemStream("kafka", "wikipedia-edits"), >> parsedJsonObject)); >> >> >>> >>>>>> > >> >> >>> >>>>>> > as well as the aforementioned changes to the log4j.xml >> file. >> >> >>> >>>>>> > >> >> >>> >>>>>> > The data pushed into the 'myTopic' topic is nothing more >> than >> >> >>>a >> >> >>> >>>>>> sentence. >> >> >>> >>>>>> > >> >> >>> >>>>>> > >> >> >>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson < >> >> >>> >>>>>> ash.mathe...@gmail.com> >> >> >>> >>>>>> > wrote: >> >> >>> >>>>>> > >> >> >>> >>>>>> > > yep, modified log4j.xml to look like this: >> >> >>> >>>>>> > > >> >> >>> >>>>>> > > <root> >> >> >>> >>>>>> > > <priority value="debug" /> >> >> >>> >>>>>> > > <appender-ref ref="RollingAppender"/> >> >> >>> >>>>>> > > <appender-ref ref="jmx" /> >> >> >>> >>>>>> > > </root> >> >> >>> >>>>>> > > >> >> >>> >>>>>> > > Not sure what you mean by #2. >> >> >>> >>>>>> > > >> >> >>> >>>>>> > > However, I'm running now, not seeing any exceptions, but >> >> >>>still >> >> >>> >>>>>>not >> >> >>> >>>>>> seeing >> >> >>> >>>>>> > > any output from System.out.println(...) >> >> >>> >>>>>> > > >> >> >>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram < >> >> >>> >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote: >> >> >>> >>>>>> > > >> >> >>> >>>>>> > >> Hey Ash, >> >> >>> >>>>>> > >> 1. Did you happen to modify your >> log4j.xml >> >> ? >> >> >>> >>>>>> > >> 2. Can you print the class path that was >> >> >>> printed >> >> >>> >>>>>> when the >> >> >>> >>>>>> > >> job started ? I am wondering if log4j was not loaded or >> >> not >> >> >>> >>>>>> present in >> >> >>> >>>>>> > the >> >> >>> >>>>>> > >> path where it¹s looking for. If you have been using >> hello >> >> >>> >>>>>>samza, >> >> >>> >>>>>> it >> >> >>> >>>>>> > should >> >> >>> >>>>>> > >> have pulled it from Maven. >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > >> Thanks, >> >> >>> >>>>>> > >> Naveen >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson < >> >> >>> >>>>>> ash.mathe...@gmail.com> >> >> >>> >>>>>> > >> wrote: >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > >> > Hey all, >> >> >>> >>>>>> > >> > >> >> >>> >>>>>> > >> > Evaluating Samza currently and am running into some >> odd >> >> >>> >>>>>>issues. >> >> >>> >>>>>> > >> > >> >> >>> >>>>>> > >> > I'm currently working off the 'hello-samza' repo and >> >> >>>trying >> >> >>> >>>>>>to >> >> >>> >>>>>> parse a >> >> >>> >>>>>> > >> > simple kafka topic that I've produced through an >> extenal >> >> >>> java >> >> >>> >>>>>> app >> >> >>> >>>>>> > >> (nothing >> >> >>> >>>>>> > >> > other than a series of sentences) and it's failing >> >> pretty >> >> >>> >>>>>>hard >> >> >>> >>>>>> for me. >> >> >>> >>>>>> > >> The >> >> >>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but as >> soon >> >> >>>as I >> >> >>> >>>>>> change the >> >> >>> >>>>>> > >> > configuration to look at a different Kafka/zookeeper >> I >> >> >>>get >> >> >>> >>>>>>the >> >> >>> >>>>>> > >> following in >> >> >>> >>>>>> > >> > the userlogs: >> >> >>> >>>>>> > >> > >> >> >>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to >> >> >>>fetch >> >> >>> >>>>>>last >> >> >>> >>>>>> > offsets >> >> >>> >>>>>> > >> > for streams [myTopic] due to >> >> kafka.common.KafkaException: >> >> >>> >>>>>> fetching >> >> >>> >>>>>> > topic >> >> >>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker >> >> >>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. >> >> >>> Retrying. >> >> >>> >>>>>> > >> > >> >> >>> >>>>>> > >> > >> >> >>> >>>>>> > >> > The modifications are pretty straightforward. In the >> >> >>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the >> following: >> >> >>> >>>>>> > >> > task.inputs=kafka.myTopic >> >> >>> >>>>>> > >> > >> systems.kafka.consumer.zookeeper.connect=redacted:2181/ >> >> >>> >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest >> >> >>> >>>>>> > >> > >> >> systems.kafka.producer.metadata.broker.list=redacted:9092 >> >> >>> >>>>>> > >> > >> >> >>> >>>>>> > >> > and in the actual java file >> >> >>>WikipediaParserStreamTask.java >> >> >>> >>>>>> > >> > public void process(IncomingMessageEnvelope >> envelope, >> >> >>> >>>>>> > MessageCollector >> >> >>> >>>>>> > >> > collector, TaskCoordinator coordinator) { >> >> >>> >>>>>> > >> > Map<String, Object> jsonObject = (Map<String, >> >> Object>) >> >> >>> >>>>>> > >> > envelope.getMessage(); >> >> >>> >>>>>> > >> > WikipediaFeedEvent event = new >> >> >>> >>>>>> WikipediaFeedEvent(jsonObject); >> >> >>> >>>>>> > >> > >> >> >>> >>>>>> > >> > try { >> >> >>> >>>>>> > >> > System.out.println(event.getRawEvent()); >> >> >>> >>>>>> > >> > >> >> >>> >>>>>> > >> > And then following the compile/extract/run process >> >> >>>outlined >> >> >>> >>>>>>in >> >> >>> >>>>>> the >> >> >>> >>>>>> > >> > hello-samza website. >> >> >>> >>>>>> > >> > >> >> >>> >>>>>> > >> > Any thoughts? I've looked online for any 'super >> simple' >> >> >>> >>>>>> examples of >> >> >>> >>>>>> > >> > ingesting kafka in samza with very little success. >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > >> >> >> >>> >>>>>> > > >> >> >>> >>>>>> > >> >> >>> >>>>>> >> >> >>> >>>>>> >> >> >>> >>>>>> >> >> >>> >>>>>> -- >> >> >>> >>>>>> Thanks and regards >> >> >>> >>>>>> >> >> >>> >>>>>> Chinmay Soman >> >> >>> >>>>>> >> >> >>> >>>>> >> >> >>> >>>>> >> >> >>> >>>> >> >> >>> >>> >> >> >>> >> >> >> >>> >> >> >>> >> >> >> >> >> > >> > > > > -- > Thanks and regards > > Chinmay Soman > -- Thanks and regards Chinmay Soman