Thanks for the input guys!

Yes, I did get that the <consumer> would be putting data into Samza. So
maybe I am not understanding something else :(
I have a RssSystemFactory which creates a RssConsumer (which
extends BlockingEnvelopMap and which in turn extends SystemConsumer). This
"RssConsumer" acts as an observer to an "RssFeed" which checks rss urls and
puts them into it because it my consumer is a BlockingEnvelopMap. My
"RssFeed" should only stop when "RssConsumer" calls its stop method.

This was my problem. I thought that this "RssFeed" created together with
the RssConsumer by the SystemFactory was already a separate thread, but it
wasn't. So now I have created a separate thread to be checking for rss
updates, and letting this thread to put the data into my consumer. This
works as I expected.

I execute the kafka consumer from the console:

deploy/kafka/bin/kafka-console-consumer.sh  --zookeeper localhost:2181
--topic rss-raw

Thanks for the input, this helped me review what I was doing wrong :)


Renato M.

2015-05-22 20:29 GMT+02:00 Yan Fang <yanfang...@gmail.com>:

> Hi Renato,
>
> There maybe a misunderstanding in the concept. Consumer is to feed the msgs
> into the Samza, while Producer is to send the msg from Samza to other
> systems. So if you implement the Consumer, should be able to see the msgs
> in the StreamTask. That's why you confuse Naveen.
>
> Cheers,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Fri, May 22, 2015 at 9:59 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Renato,
> >
> > Could you paste your console consumer command here?
> >
> > Guozhang
> >
> > On Fri, May 22, 2015 at 9:43 AM, Naveen S <navg...@gmail.com> wrote:
> >
> > > Hey Renato,
> > >                     Is there any specific reason why you are extending
> > the
> > > blocking envelope class instead of implementing a StreamTask ?
> > > http://samza.apache.org/learn/documentation/0.9/api/overview.html
> > >
> > > --Naveen
> > >
> > >
> > > On Fri, May 22, 2015 at 8:06 AM, Renato Marroquín Mogrovejo <
> > > renatoj.marroq...@gmail.com> wrote:
> > >
> > > > Hi there,
> > > >
> > > > I have developed an RSS reader application using Samza but I am
> having
> > > > problems when using kafka-console-consumer to read what has been
> gotten
> > > > from the RSS feeds.
> > > > My RssConsumer uses an object RssFeed which fetches (until stopped by
> > the
> > > > RssConsumer) all new rss feeds and hands them back to the RssConsumer
> > > which
> > > > puts them into a single partition[2]. So from my logs I can see that
> > > there
> > > > are new feeds being put into my partition but I can not get them with
> > > > consoler consumer, I guess I am not putting them correctly or I am
> > > > producing some blocking behaviour somehow?
> > > > I using hello-samza 0.10 project which uses the latest Samza version
> as
> > > > well. I don't think this is an issue with Samza, it is probably
> > > something I
> > > > am doing wrong :(
> > > > Thanks in advance for any suggestions.
> > > >
> > > >
> > > > Renato M.
> > > >
> > > > [1] https://github.com/renato2099/hello-samza
> > > > [2]
> > > >
> > > >
> > >
> >
> https://github.com/renato2099/hello-samza/blob/master/src/main/java/samza/examples/rss/system/RssConsumer.java#L42
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to