>I changed the systems.kafka.samza.msg.serde=json to 'string' a while back,
but that caused a separate exception.  However that was many, MANY attempts
ago.

This may not work because that will set all serialization formats (input
and output) to json / string. In your case you're inputting string and
outputting json. So you might have to set that explicitly.

On Mon, Mar 23, 2015 at 9:24 PM, Chinmay Soman <chinmay.cere...@gmail.com>
wrote:

> Since you're producing String data to 'myTopic', can you try setting the
> string serialization in your config ?
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> systems.kafka.streams.myTopic.samza.msg.serde=string
>
>
> On Mon, Mar 23, 2015 at 9:17 PM, Ash W Matheson <ash.mathe...@gmail.com>
> wrote:
>
>> more info - new exception message:
>>
>> Exception in thread "main"
>> org.apache.samza.system.SystemConsumersException: Cannot deserialize an
>> incoming message.
>>
>> Updated the diff in pastebin with the changes.
>>
>> On Mon, Mar 23, 2015 at 8:41 PM, Ash W Matheson <ash.mathe...@gmail.com>
>> wrote:
>>
>> > Gah!  Yeah, those were gone several revisions ago but didn't get nuked
>> in
>> > the last iteration.
>> >
>> > OK, let me do a quick test to see if that was my problem all along.
>> >
>> > On Mon, Mar 23, 2015 at 8:38 PM, Navina Ramesh <
>> > nram...@linkedin.com.invalid> wrote:
>> >
>> >> Hey Ash,
>> >> I was referring to the lines before the try block.
>> >>
>> >> Map<String, Object> jsonObject = (Map<String, Object>)
>> >> envelope.getMessage();
>> >>     WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
>> >>
>> >>     try {
>> >>       System.out.println("[DWH] should see this");
>> >>       System.out.println(event.getRawEvent());
>> >> …
>> >>
>> >>
>> >> Did you remove those lines as well?
>> >>
>> >> Navina
>> >>
>> >> On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote:
>> >>
>> >> >Just looking at the diff I posted and it's:
>> >> >
>> >> >
>> >> >   1.      try {
>> >> >   2. -      Map<String, Object> parsedJsonObject =
>> >> >parse(event.getRawEvent(
>> >> >   ));
>> >> >   3. +      System.out.println("[DWH] should see this");
>> >> >   4. +      System.out.println(event.getRawEvent());
>> >> >   5. +      // Map<String, Object> parsedJsonObject = parse(
>> >> >   event.getRawEvent());
>> >> >
>> >> >
>> >> >I've removed the Map and added two System.out.println calls.  So no,
>> >> there
>> >> >shouldn't be any reference to
>> >> >Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
>> >> >in the source java file.
>> >> >
>> >> >
>> >> >On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson <
>> ash.mathe...@gmail.com>
>> >> >wrote:
>> >> >
>> >> >> I'm in transit right now but if memory serves me everything should
>> be
>> >> >> commented out of that method except for the System.out.println call.
>> >> >>I'll
>> >> >> be home shortly and can confirm.
>> >> >> On Mar 23, 2015 7:28 PM, "Navina Ramesh"
>> <nram...@linkedin.com.invalid
>> >> >
>> >> >> wrote:
>> >> >>
>> >> >>> Hi Ash,
>> >> >>> I just ran wikipedia-parser with your patch. Looks like you have
>> set
>> >> >>>the
>> >> >>> message serde correctly in the configs. However, the original code
>> >> >>>still
>> >> >>> converts it into a Map for consumption in the WikipediaFeedEvent.
>> >> >>> I am seeing the following (expected):
>> >> >>>
>> >> >>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] Uncaught
>> >> >>> exception in thread (name=main). Exiting process now.
>> >> >>> java.lang.ClassCastException: java.lang.String cannot be cast to
>> >> >>> java.util.Map
>> >> >>>  at
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi
>> >> >>>aPa
>> >> >>> rserStreamTask.java:38)
>> >> >>>  at
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(
>> >> >>>Tas
>> >> >>> kInstance.scala:133)
>> >> >>>
>> >> >>> Did you make the changes to fix this error? Your patch doesn¹t
>> seem to
>> >> >>> have that.
>> >> >>> Line 38 Map<String, Object> jsonObject = (Map<String, Object>)
>> >> >>> envelope.getMessage();
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> Lmk so I can investigate further.
>> >> >>>
>> >> >>> Cheers!
>> >> >>> Navina
>> >> >>>
>> >> >>> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com>
>> wrote:
>> >> >>>
>> >> >>> >If anyone's interested, I've posted a diff of the project here:
>> >> >>> >http://pastebin.com/6ZW6Y1Vu
>> >> >>> >and the python publisher here: http://pastebin.com/2NvTFDFx
>> >> >>> >
>> >> >>> >if you want to take a stab at it.
>> >> >>> >
>> >> >>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson
>> >> >>><ash.mathe...@gmail.com>
>> >> >>> >wrote:
>> >> >>> >
>> >> >>> >> Ok, so very simple test, all running on a local machine, not
>> across
>> >> >>> >> networks and all in the hello-samza repo this time around.
>> >> >>> >>
>> >> >>> >> I've got the datapusher.py file set up to push data into
>> localhost.
>> >> >>>One
>> >> >>> >> event per second.
>> >> >>> >> And a modified hello-samza where I've modified the
>> >> >>> >> WikipediaParserStreamTask.java class to simply read what's
>> there.
>> >> >>> >>
>> >> >>> >> Running them both now and I'm seeing in the stderr files
>> >> >>> >>
>> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr)
>> >> >>>the
>> >> >>> >> following:
>> >> >>> >>
>> >> >>> >> Exception in thread "main"
>> >> >>> >> org.apache.samza.system.SystemConsumersException: Cannot
>> >> >>>deserialize an
>> >> >>> >> incoming message.
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
>> >> >>>>>93)
>> >> >>> >>     at org.apache.samza.system.SystemConsumers.org
>> >> >>> >>
>> >>
>> >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
>> >> >>>>>mCo
>> >> >>> >>nsumers.scala:276)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
>> >> >>>>>mCo
>> >> >>> >>nsumers.scala:276)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
>> >> >>>>>sca
>> >> >>> >>la:244)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
>> >> >>>>>sca
>> >> >>> >>la:244)
>> >> >>> >>     at
>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >> >>> >>     at
>> >> >>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >>> >>     at
>> >> >>> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>> >> >>> >>     at
>> >> >>> >>
>> >>
>> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s
>> >> >>>>>cal
>> >> >>> >>a:47)
>> >> >>> >>     at scala.collection.SetLike$class.map(SetLike.scala:93)
>> >> >>> >>     at scala.collection.AbstractSet.map(Set.scala:47)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:
>> >> >>>>>276
>> >> >>> >>)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2
>> >> >>>>>13)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
>> >> >>>>>(Ru
>> >> >>> >>nLoop.scala:81)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
>> >> >>>>>(Ru
>> >> >>> >>nLoop.scala:81)
>> >> >>> >>     at
>> >> >>> >>
>> >>
>> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>> >> >>> >>     at
>> >> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc
>> >> >>>>>ala
>> >> >>> >>:80)
>> >> >>> >>     at
>> >> >>> >>
>> >>
>> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>> >> >>> >>     at
>> >> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>> >> >>> >>     at
>> org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
>> >> >>> >>     at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>> >> >>> >>     at
>> >> >>> >>
>> >>
>> >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
>> >> >>>>>la:
>> >> >>> >>108)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >>
>> >>
>> >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>> >> >>> >>     at
>> >> >>>
>> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> >> >>> >> Caused by: org.codehaus.jackson.JsonParseException: Unexpected
>> >> >>> character
>> >> >>> >> ('M' (code 77)): expected a valid value (number, String, array,
>> >> >>>object,
>> >> >>> >> 'true', 'false' or 'null')
>> >> >>> >>  at [Source: [B@5454d285; line: 1, column: 2]
>> >> >>> >>     at
>> >> >>> >>
>> >>
>> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse
>> >> >>>>>rMi
>> >> >>> >>nimalBase.java:385)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(
>> >> >>>>>Jso
>> >> >>> >>nParserMinimalBase.java:306)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8
>> >> >>>>>Str
>> >> >>> >>eamParser.java:1581)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S
>> >> >>>>>tre
>> >> >>> >>amParser.java:436)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.
>> >> >>>>>jav
>> >> >>> >>a:322)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav
>> >> >>>>>a:2
>> >> >>> >>432)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja
>> >> >>>>>va:
>> >> >>> >>2389)
>> >> >>> >>     at
>> >> >>> >>
>> >>
>> >>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
>> >> >>> >>     at
>> >> >>>
>> >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala
>> >> >>>>>:11
>> >> >>> >>5)
>> >> >>> >>     at
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
>> >> >>>>>90)
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> I changed the systems.kafka.samza.msg.serde=json to 'string' a
>> >> while
>> >> >>> >>back,
>> >> >>> >> but that caused a separate exception.  However that was many,
>> MANY
>> >> >>> >>attempts
>> >> >>> >> ago.
>> >> >>> >>
>> >> >>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <
>> >> >>> ash.mathe...@gmail.com>
>> >> >>> >> wrote:
>> >> >>> >>
>> >> >>> >>> Ahh, I was going to add it to the run-class.sh script.
>> >> >>> >>>
>> >> >>> >>> Yeah, it's already there by default:
>> >> >>> >>>
>> >> >>> >>>
>> >> >>> >>> # Metrics
>> >> >>> >>> metrics.reporters=snapshot,jmx
>> >> >>> >>>
>> >> >>> >>>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met
>> >> >>>>>>ric
>> >> >>> >>>sSnapshotReporterFactory
>> >> >>> >>> metrics.reporter.snapshot.stream=kafka.metrics
>> >> >>> >>>
>> >> >>> >>>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor
>> >> >>>>>>ter
>> >> >>> >>>Factory
>> >> >>> >>>
>> >> >>> >>> So, where would I see those metrics?
>> >> >>> >>>
>> >> >>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson
>> >> >>> >>><ash.mathe...@gmail.com>
>> >> >>> >>> wrote:
>> >> >>> >>>
>> >> >>> >>>> read: I'm a C++ programmer looking at Java for the first time
>> in
>> >> >>>> 10
>> >> >>> >>>> years
>> >> >>> >>>>
>> >> >>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson
>> >> >>> >>>><ash.mathe...@gmail.com>
>> >> >>> >>>> wrote:
>> >> >>> >>>>
>> >> >>> >>>>> I'm assuming I have Jmx defined ... where would that get set?
>> >> >>> >>>>>
>> >> >>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman <
>> >> >>> >>>>> chinmay.cere...@gmail.com> wrote:
>> >> >>> >>>>>
>> >> >>> >>>>>> Hey Ash,
>> >> >>> >>>>>>
>> >> >>> >>>>>> Can you see your job metrics (if you have the Jmx metrics
>> >> >>>defined)
>> >> >>> >>>>>>to
>> >> >>> >>>>>> see
>> >> >>> >>>>>> if your job is actually doing anything ? My only guess at
>> this
>> >> >>> point
>> >> >>> >>>>>> is the
>> >> >>> >>>>>> process method is not being called because somehow there's
>> no
>> >> >>> >>>>>>incoming
>> >> >>> >>>>>> data. I could be totally wrong of course.
>> >> >>> >>>>>>
>> >> >>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <
>> >> >>> >>>>>> ash.mathe...@gmail.com>
>> >> >>> >>>>>> wrote:
>> >> >>> >>>>>>
>> >> >>> >>>>>> > Just to be clear, here's what's changed from the default
>> >> >>> >>>>>>hello-samza
>> >> >>> >>>>>> repo:
>> >> >>> >>>>>> >
>> >> >>> >>>>>> > wikipedia-parser.properties==========================
>> >> >>> >>>>>> > task.inputs=kafka.myTopic
>> >> >>> >>>>>> > systems.kafka.consumer.zookeeper.connect=
>> >> >>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
>> >> >>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest
>> >> >>> >>>>>> >
>> >> >>> >>>>>> > WikipediaParserStreamTask.java =====================
>> >> >>> >>>>>> >   public void process(IncomingMessageEnvelope envelope,
>> >> >>> >>>>>> MessageCollector
>> >> >>> >>>>>> > collector, TaskCoordinator coordinator) {
>> >> >>> >>>>>> >     Map<String, Object> jsonObject = (Map<String, Object>)
>> >> >>> >>>>>> > envelope.getMessage();
>> >> >>> >>>>>> >     WikipediaFeedEvent event = new
>> >> >>> WikipediaFeedEvent(jsonObject);
>> >> >>> >>>>>> >
>> >> >>> >>>>>> >     try {
>> >> >>> >>>>>> >       System.out.println(event.getRawEvent());
>> >> >>> >>>>>> >       // Map<String, Object> parsedJsonObject =
>> >> >>> >>>>>> parse(event.getRawEvent());
>> >> >>> >>>>>> >
>> >> >>> >>>>>> >       // parsedJsonObject.put("channel",
>> event.getChannel());
>> >> >>> >>>>>> >       // parsedJsonObject.put("source",
>> event.getSource());
>> >> >>> >>>>>> >       // parsedJsonObject.put("time", event.getTime());
>> >> >>> >>>>>> >
>> >> >>> >>>>>> >       // collector.send(new OutgoingMessageEnvelope(new
>> >> >>> >>>>>> > SystemStream("kafka", "wikipedia-edits"),
>> parsedJsonObject));
>> >> >>> >>>>>> >
>> >> >>> >>>>>> > as well as the aforementioned changes to the log4j.xml
>> file.
>> >> >>> >>>>>> >
>> >> >>> >>>>>> > The data pushed into the 'myTopic' topic is nothing more
>> than
>> >> >>>a
>> >> >>> >>>>>> sentence.
>> >> >>> >>>>>> >
>> >> >>> >>>>>> >
>> >> >>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <
>> >> >>> >>>>>> ash.mathe...@gmail.com>
>> >> >>> >>>>>> > wrote:
>> >> >>> >>>>>> >
>> >> >>> >>>>>> > > yep, modified log4j.xml to look like this:
>> >> >>> >>>>>> > >
>> >> >>> >>>>>> > >   <root>
>> >> >>> >>>>>> > >     <priority value="debug" />
>> >> >>> >>>>>> > >     <appender-ref ref="RollingAppender"/>
>> >> >>> >>>>>> > >     <appender-ref ref="jmx" />
>> >> >>> >>>>>> > >   </root>
>> >> >>> >>>>>> > >
>> >> >>> >>>>>> > > Not sure what you mean by #2.
>> >> >>> >>>>>> > >
>> >> >>> >>>>>> > > However, I'm running now, not seeing any exceptions, but
>> >> >>>still
>> >> >>> >>>>>>not
>> >> >>> >>>>>> seeing
>> >> >>> >>>>>> > > any output from System.out.println(...)
>> >> >>> >>>>>> > >
>> >> >>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram <
>> >> >>> >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote:
>> >> >>> >>>>>> > >
>> >> >>> >>>>>> > >> Hey Ash,
>> >> >>> >>>>>> > >>                1. Did you happen to modify your
>> log4j.xml
>> >> ?
>> >> >>> >>>>>> > >>                2. Can you print the class path that was
>> >> >>> printed
>> >> >>> >>>>>> when the
>> >> >>> >>>>>> > >> job started ? I am wondering if log4j was not loaded or
>> >> not
>> >> >>> >>>>>> present in
>> >> >>> >>>>>> > the
>> >> >>> >>>>>> > >> path where it¹s looking for. If you have been using
>> hello
>> >> >>> >>>>>>samza,
>> >> >>> >>>>>> it
>> >> >>> >>>>>> > should
>> >> >>> >>>>>> > >> have pulled it from Maven.
>> >> >>> >>>>>> > >>
>> >> >>> >>>>>> > >> Thanks,
>> >> >>> >>>>>> > >> Naveen
>> >> >>> >>>>>> > >>
>> >> >>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <
>> >> >>> >>>>>> ash.mathe...@gmail.com>
>> >> >>> >>>>>> > >> wrote:
>> >> >>> >>>>>> > >>
>> >> >>> >>>>>> > >> > Hey all,
>> >> >>> >>>>>> > >> >
>> >> >>> >>>>>> > >> > Evaluating Samza currently and am running into some
>> odd
>> >> >>> >>>>>>issues.
>> >> >>> >>>>>> > >> >
>> >> >>> >>>>>> > >> > I'm currently working off the 'hello-samza' repo and
>> >> >>>trying
>> >> >>> >>>>>>to
>> >> >>> >>>>>> parse a
>> >> >>> >>>>>> > >> > simple kafka topic that I've produced through an
>> extenal
>> >> >>> java
>> >> >>> >>>>>> app
>> >> >>> >>>>>> > >> (nothing
>> >> >>> >>>>>> > >> > other than a series of sentences) and it's failing
>> >> pretty
>> >> >>> >>>>>>hard
>> >> >>> >>>>>> for me.
>> >> >>> >>>>>> > >> The
>> >> >>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but as
>> soon
>> >> >>>as I
>> >> >>> >>>>>> change the
>> >> >>> >>>>>> > >> > configuration to look at a different Kafka/zookeeper
>> I
>> >> >>>get
>> >> >>> >>>>>>the
>> >> >>> >>>>>> > >> following in
>> >> >>> >>>>>> > >> > the userlogs:
>> >> >>> >>>>>> > >> >
>> >> >>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to
>> >> >>>fetch
>> >> >>> >>>>>>last
>> >> >>> >>>>>> > offsets
>> >> >>> >>>>>> > >> > for streams [myTopic] due to
>> >> kafka.common.KafkaException:
>> >> >>> >>>>>> fetching
>> >> >>> >>>>>> > topic
>> >> >>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker
>> >> >>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed.
>> >> >>> Retrying.
>> >> >>> >>>>>> > >> >
>> >> >>> >>>>>> > >> >
>> >> >>> >>>>>> > >> > The modifications are pretty straightforward.  In the
>> >> >>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the
>> following:
>> >> >>> >>>>>> > >> > task.inputs=kafka.myTopic
>> >> >>> >>>>>> > >> >
>> systems.kafka.consumer.zookeeper.connect=redacted:2181/
>> >> >>> >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest
>> >> >>> >>>>>> > >> >
>> >> systems.kafka.producer.metadata.broker.list=redacted:9092
>> >> >>> >>>>>> > >> >
>> >> >>> >>>>>> > >> > and in the actual java file
>> >> >>>WikipediaParserStreamTask.java
>> >> >>> >>>>>> > >> >  public void process(IncomingMessageEnvelope
>> envelope,
>> >> >>> >>>>>> > MessageCollector
>> >> >>> >>>>>> > >> > collector, TaskCoordinator coordinator) {
>> >> >>> >>>>>> > >> >    Map<String, Object> jsonObject = (Map<String,
>> >> Object>)
>> >> >>> >>>>>> > >> > envelope.getMessage();
>> >> >>> >>>>>> > >> >    WikipediaFeedEvent event = new
>> >> >>> >>>>>> WikipediaFeedEvent(jsonObject);
>> >> >>> >>>>>> > >> >
>> >> >>> >>>>>> > >> >    try {
>> >> >>> >>>>>> > >> >        System.out.println(event.getRawEvent());
>> >> >>> >>>>>> > >> >
>> >> >>> >>>>>> > >> > And then following the compile/extract/run process
>> >> >>>outlined
>> >> >>> >>>>>>in
>> >> >>> >>>>>> the
>> >> >>> >>>>>> > >> > hello-samza website.
>> >> >>> >>>>>> > >> >
>> >> >>> >>>>>> > >> > Any thoughts?  I've looked online for any 'super
>> simple'
>> >> >>> >>>>>> examples of
>> >> >>> >>>>>> > >> > ingesting kafka in samza with very little success.
>> >> >>> >>>>>> > >>
>> >> >>> >>>>>> > >>
>> >> >>> >>>>>> > >
>> >> >>> >>>>>> >
>> >> >>> >>>>>>
>> >> >>> >>>>>>
>> >> >>> >>>>>>
>> >> >>> >>>>>> --
>> >> >>> >>>>>> Thanks and regards
>> >> >>> >>>>>>
>> >> >>> >>>>>> Chinmay Soman
>> >> >>> >>>>>>
>> >> >>> >>>>>
>> >> >>> >>>>>
>> >> >>> >>>>
>> >> >>> >>>
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >>
>> >>
>> >
>>
>
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>



-- 
Thanks and regards

Chinmay Soman

Reply via email to