Hi Shrijeet;

I do not see any stacktarce. It stuck at poll.

Here is my consumer conf;

zookeeper.connect=xx.com\:2181

metadata.broker.list=yy.com\:9092

enable.auto.commit=true

auto.commit.interval.ms=101

max.partition.fetch.bytes=35

session.timeout.ms=3001

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

value.deserializer=com.xx.process.RawFileSerializer

heartbeat.interval.ms=1000

auto.offset.reset=earliest

serializer.class=com.xx.process.FileSerializer

bootstrap.servers=yy.com\:9092

group.id=test

consumer.timeout.ms=1

Thanks.

On 30 March 2016 at 10:10, Shrijeet Paliwal <shrijeet.pali...@gmail.com>
wrote:

> Can you share complete Consumer configuration? Also what does stack-trace
> show,  is it stuck in poll?
>
> I think I have seen this before in one of my tests, in my case I had pinned
> it to side effect of https://issues.apache.org/jira/browse/KAFKA-1894.
> Essentially poll(timeout) doesn't always respect the timeout :-|
>
> --
> Shrijeet
>
> On Tue, Mar 29, 2016 at 3:42 PM, Ratha v <vijayara...@gmail.com> wrote:
>
> > Hi oleg;
> >
> >    1. The consumer shell  (The default tool :./kafka-console-consumer.sh
> >    --zookeeper xxx:2181 --topic yy
> >    ) consumes my messages. Only my programmed listener is not consuming
> ..I
> >    provided the code in the previous thread.
> >
> >
> > Here is my producer (I believe this works, because, the* kafka
> consumer.sh
> > *
> >  consumes)
> >
> >
> >
> >
> > import java.io.FileNotFoundException;
> > import java.util.Properties;
> >
> > import com.xx.core.impl.KafkaConfigurationLoader;
> > import com.xx.core.model.base.RawFile;
> >
> > import org.apache.kafka.clients.producer.KafkaProducer;
> > import org.apache.kafka.clients.producer.ProducerRecord;
> >
> > /**
> >  * Class to produce some test messages to the Kafka server
> >  *
> >  * @author ratha
> >  *
> >  */
> > public class KafkaMessageProducer {
> > private Properties properties;
> > private String topic;
> > private File file;
> > private KafkaProducer<String, File> producer;
> >
> > public KafkaMessageProducer(String topic, File file) {
> > this.topic = topic;
> > this.file = file;
> >
> > KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
> > try {
> > properties = confLoader.loadProducerConfig();
> >
> > producer = new KafkaProducer<>(properties);
> > } catch (FileNotFoundException e) {
> > e.printStackTrace();
> > }
> > }
> >
> > public void generateMessgaes() {
> > try {
> > for(int i=0; i<10;i++){
> > producer.send(new ProducerRecord<String, File>(topic, file));
> > }
> > } catch (Exception e) {
> > e.printStackTrace();
> > System.out.println("Error in publishing messages to the topic : " +
> topic);
> >
> > } finally {
> > producer.close();
> > }
> > }
> >
> > }
> >
> > Thanks
> >
> >
> > On 30 March 2016 at 09:31, Oleg Zhurakousky <
> ozhurakou...@hortonworks.com>
> > wrote:
> >
> > > Ratha
> > >
> > > " I published messages and started the listener ---> No success”.
> > >
> > > I would stop right here as you already see an issue. So I am assuming
> the
> > > listener you started is console listener and with that I am going to
> > assume
> > > it works providing message is on the topic. And since nothing happened
> > then
> > > you probably have issue in producer.
> > >
> > > Could you post producer code.
> > >
> > > Oleg
> > >
> > > > On Mar 29, 2016, at 6:24 PM, Ratha v <vijayara...@gmail.com> wrote:
> > > >
> > > > Hi Oleg;
> > > >
> > > > Thanks for the guide..Here is my troubleshooting steps;
> > > >
> > > >
> > > >   1. I published messages and started the listener ---> No success
> > > >   2. I set the "auto.offset.reset" property to *earliest  --> No
> > success*
> > > >   3. *Set the "consumer.timeout" to 1 --> Nothing happens*
> > > >   4. *T*hought may be my messages are not published, so started the
> > > >   consumer shell script (default tool in the kafka distribution),
> that
> > > >   consumes my messages well.
> > > >   5. Changed hasnext() to while (true)  --> no change in the
> behaviour
> > > >
> > > >
> > > > Im really confused..:(
> > > >
> > > > Here is the code;
> > > >
> > > > *executor*
> > > >
> > > > public void start() {
> > > >
> > > > List<String> topics = Arrays.asList(topic);
> > > >
> > > > ExecutorService executor =
> Executors.newFixedThreadPool(CoreConstants.
> > > > THREAD_SIZE);
> > > >
> > > > ListenerThread lThread = new ListenerThread(topics, properties);
> > > >
> > > > executor.submit(lThread);
> > > >
> > > >
> > > > Runtime.getRuntime().addShutdownHook(new Thread() {
> > > >
> > > > @Override
> > > >
> > > > public void run() {
> > > >
> > > > lThread.shutdown();
> > > >
> > > > executor.shutdown();
> > > >
> > > > try {
> > > >
> > > > executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
> > > >
> > > > } catch (InterruptedException e) {
> > > >
> > > > e.printStackTrace();
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > > > });
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > *Thread*
> > > >
> > > >
> > > >
> > > > public void run() {
> > > >
> > > > try {
> > > >
> > > > consumer.subscribe(topics);
> > > >
> > > > while (true) {
> > > >
> > > > ConsumerRecords<String, File> records = consumer.poll(100);
> > > >
> > > > System.out.println("&&&&&&2222 : "+records.count());
> > > >
> > > > for (ConsumerRecord<String, RawFile> record : records) {
> > > >
> > > > System.out.println("&&&&&&333");
> > > >
> > > > FileProcessor processor = new FileProcessor();
> > > >
> > > > processor.processFile(record.value());
> > > >
> > > >
> > > > System.out.println("&&&&&&"+record.value());
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > > >
> > > > } catch (Throwable e) {
> > > >
> > > > e.printStackTrace();
> > > >
> > > > System.out.println("eror in polling");
> > > >
> > > > // ignore for shutdown
> > > >
> > > > } finally {
> > > >
> > > > consumer.close();
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > Do you think any other issues from my end?
> > > >
> > > > Thanks
> > > >
> > > >
> > > > On 29 March 2016 at 23:13, Oleg Zhurakousky <
> > > ozhurakou...@hortonworks.com>
> > > > wrote:
> > > >
> > > >> Ratha
> > > >>
> > > >> It appears you have couple of issues here, so I’ll start with the
> > > consumer
> > > >> first.
> > > >> If you do a search on this mailing list on “Consumer deadlock” in
> the
> > > >> subject you’ll find a thread where similar symptoms were discussed.
> > > >> Basically the hasNext() method you mentioned is implemented as a
> > > blocking
> > > >> call and while we may all have opinion about that decision and why
> > > Iterator
> > > >> was chosen in the first place, it is what it is. But from what I
> > > understand
> > > >> it simply means that there are no messages to poll from the topic
> > (yes I
> > > >> know, hasNext()=false seems natural here but. . .). What you can do
> is
> > > set ‘
> > > >> consumer.timeout.ms’ property to value such as ‘1’. By doing so you
> > are
> > > >> stating that you are willing to block for no longer then 1
> > millisecond.
> > > >> But you also mention that you are sending message to the topic and
> > > >> therefore have reasonable expectation to poll something from it but
> > yet
> > > >> you’re blocking. That is strange indeed. What I would suggest is to
> > try
> > > one
> > > >> thing at the time. Use your Java producer, in conjunction with
> console
> > > >> consumer. This will help to narrow down the problem (e.g., issue
> with
> > > >> producer that may not be actually sending). Then hopefully if you
> > > receive
> > > >> successfully then you know the problem is in the consumer and so on.
> > > >>
> > > >> Cheers
> > > >> Oleg
> > > >>
> > > >> On Mar 29, 2016, at 12:48 AM, Ratha v <vijayara...@gmail.com
> <mailto:
> > > >> vijayara...@gmail.com>> wrote:
> > > >>
> > > >> Hi all;
> > > >> I publish a java object and try to consume it..
> > > >> I have a poll method to consume objects, but it never returns any
> > > >> objects..My program runs forever.(?)
> > > >>
> > > >> *ConsumerThread*
> > > >>
> > > >> public void run() {
> > > >>
> > > >> try {
> > > >>
> > > >> consumer.subscribe(topics);
> > > >>
> > > >>
> > > >> Iterator<ConsumerRecord<String, File>> it =
> > > consumer.poll(1000).iterator();
> > > >>
> > > >> while (it.hasNext()) {
> > > >>
> > > >> ConsumerRecord<String, File> record = it.next();
> > > >>
> > > >> FileProcessor processor = new FileProcessor();
> > > >>
> > > >> processor.processFile(record.value());
> > > >>
> > > >> System.out.println("-----" + ": " + record);
> > > >>
> > > >>
> > > >>
> > > >> When i debug it never goes inside the while loop. Why is that?
> > > >>
> > > >> I publish object like this;
> > > >>
> > > >> producer.send(new ProducerRecord<String, File>(topic, file));
> > > >>
> > > >>
> > > >> Thanks
> > > >> --
> > > >> -Ratha
> > > >> http://vvratha.blogspot.com/
> > > >>
> > > >>
> > > >
> > > >
> > > > --
> > > > -Ratha
> > > > http://vvratha.blogspot.com/
> > >
> > >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/

Reply via email to