Ratha " I published messages and started the listener ---> No success”.
I would stop right here as you already see an issue. So I am assuming the listener you started is console listener and with that I am going to assume it works providing message is on the topic. And since nothing happened then you probably have issue in producer. Could you post producer code. Oleg > On Mar 29, 2016, at 6:24 PM, Ratha v <vijayara...@gmail.com> wrote: > > Hi Oleg; > > Thanks for the guide..Here is my troubleshooting steps; > > > 1. I published messages and started the listener ---> No success > 2. I set the "auto.offset.reset" property to *earliest --> No success* > 3. *Set the "consumer.timeout" to 1 --> Nothing happens* > 4. *T*hought may be my messages are not published, so started the > consumer shell script (default tool in the kafka distribution), that > consumes my messages well. > 5. Changed hasnext() to while (true) --> no change in the behaviour > > > Im really confused..:( > > Here is the code; > > *executor* > > public void start() { > > List<String> topics = Arrays.asList(topic); > > ExecutorService executor = Executors.newFixedThreadPool(CoreConstants. > THREAD_SIZE); > > ListenerThread lThread = new ListenerThread(topics, properties); > > executor.submit(lThread); > > > Runtime.getRuntime().addShutdownHook(new Thread() { > > @Override > > public void run() { > > lThread.shutdown(); > > executor.shutdown(); > > try { > > executor.awaitTermination(5000, TimeUnit.MILLISECONDS); > > } catch (InterruptedException e) { > > e.printStackTrace(); > > } > > } > > }); > > } > > > > *Thread* > > > > public void run() { > > try { > > consumer.subscribe(topics); > > while (true) { > > ConsumerRecords<String, File> records = consumer.poll(100); > > System.out.println("&&&&&&2222 : "+records.count()); > > for (ConsumerRecord<String, RawFile> record : records) { > > System.out.println("&&&&&&333"); > > FileProcessor processor = new FileProcessor(); > > processor.processFile(record.value()); > > > System.out.println("&&&&&&"+record.value()); > > } > > } > > > } catch (Throwable e) { > > e.printStackTrace(); > > System.out.println("eror in polling"); > > // ignore for shutdown > > } finally { > > consumer.close(); > > } > > > > Do you think any other issues from my end? > > Thanks > > > On 29 March 2016 at 23:13, Oleg Zhurakousky <ozhurakou...@hortonworks.com> > wrote: > >> Ratha >> >> It appears you have couple of issues here, so I’ll start with the consumer >> first. >> If you do a search on this mailing list on “Consumer deadlock” in the >> subject you’ll find a thread where similar symptoms were discussed. >> Basically the hasNext() method you mentioned is implemented as a blocking >> call and while we may all have opinion about that decision and why Iterator >> was chosen in the first place, it is what it is. But from what I understand >> it simply means that there are no messages to poll from the topic (yes I >> know, hasNext()=false seems natural here but. . .). What you can do is set ‘ >> consumer.timeout.ms’ property to value such as ‘1’. By doing so you are >> stating that you are willing to block for no longer then 1 millisecond. >> But you also mention that you are sending message to the topic and >> therefore have reasonable expectation to poll something from it but yet >> you’re blocking. That is strange indeed. What I would suggest is to try one >> thing at the time. Use your Java producer, in conjunction with console >> consumer. This will help to narrow down the problem (e.g., issue with >> producer that may not be actually sending). Then hopefully if you receive >> successfully then you know the problem is in the consumer and so on. >> >> Cheers >> Oleg >> >> On Mar 29, 2016, at 12:48 AM, Ratha v <vijayara...@gmail.com<mailto: >> vijayara...@gmail.com>> wrote: >> >> Hi all; >> I publish a java object and try to consume it.. >> I have a poll method to consume objects, but it never returns any >> objects..My program runs forever.(?) >> >> *ConsumerThread* >> >> public void run() { >> >> try { >> >> consumer.subscribe(topics); >> >> >> Iterator<ConsumerRecord<String, File>> it = consumer.poll(1000).iterator(); >> >> while (it.hasNext()) { >> >> ConsumerRecord<String, File> record = it.next(); >> >> FileProcessor processor = new FileProcessor(); >> >> processor.processFile(record.value()); >> >> System.out.println("-----" + ": " + record); >> >> >> >> When i debug it never goes inside the while loop. Why is that? >> >> I publish object like this; >> >> producer.send(new ProducerRecord<String, File>(topic, file)); >> >> >> Thanks >> -- >> -Ratha >> http://vvratha.blogspot.com/ >> >> > > > -- > -Ratha > http://vvratha.blogspot.com/