Hi Oleg; Thanks for the guide..Here is my troubleshooting steps;
1. I published messages and started the listener ---> No success 2. I set the "auto.offset.reset" property to *earliest --> No success* 3. *Set the "consumer.timeout" to 1 --> Nothing happens* 4. *T*hought may be my messages are not published, so started the consumer shell script (default tool in the kafka distribution), that consumes my messages well. 5. Changed hasnext() to while (true) --> no change in the behaviour Im really confused..:( Here is the code; *executor* public void start() { List<String> topics = Arrays.asList(topic); ExecutorService executor = Executors.newFixedThreadPool(CoreConstants. THREAD_SIZE); ListenerThread lThread = new ListenerThread(topics, properties); executor.submit(lThread); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { lThread.shutdown(); executor.shutdown(); try { executor.awaitTermination(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }); } *Thread* public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords<String, File> records = consumer.poll(100); System.out.println("&&&&&&2222 : "+records.count()); for (ConsumerRecord<String, RawFile> record : records) { System.out.println("&&&&&&333"); FileProcessor processor = new FileProcessor(); processor.processFile(record.value()); System.out.println("&&&&&&"+record.value()); } } } catch (Throwable e) { e.printStackTrace(); System.out.println("eror in polling"); // ignore for shutdown } finally { consumer.close(); } Do you think any other issues from my end? Thanks On 29 March 2016 at 23:13, Oleg Zhurakousky <ozhurakou...@hortonworks.com> wrote: > Ratha > > It appears you have couple of issues here, so I’ll start with the consumer > first. > If you do a search on this mailing list on “Consumer deadlock” in the > subject you’ll find a thread where similar symptoms were discussed. > Basically the hasNext() method you mentioned is implemented as a blocking > call and while we may all have opinion about that decision and why Iterator > was chosen in the first place, it is what it is. But from what I understand > it simply means that there are no messages to poll from the topic (yes I > know, hasNext()=false seems natural here but. . .). What you can do is set ‘ > consumer.timeout.ms’ property to value such as ‘1’. By doing so you are > stating that you are willing to block for no longer then 1 millisecond. > But you also mention that you are sending message to the topic and > therefore have reasonable expectation to poll something from it but yet > you’re blocking. That is strange indeed. What I would suggest is to try one > thing at the time. Use your Java producer, in conjunction with console > consumer. This will help to narrow down the problem (e.g., issue with > producer that may not be actually sending). Then hopefully if you receive > successfully then you know the problem is in the consumer and so on. > > Cheers > Oleg > > On Mar 29, 2016, at 12:48 AM, Ratha v <vijayara...@gmail.com<mailto: > vijayara...@gmail.com>> wrote: > > Hi all; > I publish a java object and try to consume it.. > I have a poll method to consume objects, but it never returns any > objects..My program runs forever.(?) > > *ConsumerThread* > > public void run() { > > try { > > consumer.subscribe(topics); > > > Iterator<ConsumerRecord<String, File>> it = consumer.poll(1000).iterator(); > > while (it.hasNext()) { > > ConsumerRecord<String, File> record = it.next(); > > FileProcessor processor = new FileProcessor(); > > processor.processFile(record.value()); > > System.out.println("-----" + ": " + record); > > > > When i debug it never goes inside the while loop. Why is that? > > I publish object like this; > > producer.send(new ProducerRecord<String, File>(topic, file)); > > > Thanks > -- > -Ratha > http://vvratha.blogspot.com/ > > -- -Ratha http://vvratha.blogspot.com/