Can you share complete Consumer configuration? Also what does stack-trace show, is it stuck in poll?
I think I have seen this before in one of my tests, in my case I had pinned it to side effect of https://issues.apache.org/jira/browse/KAFKA-1894. Essentially poll(timeout) doesn't always respect the timeout :-| -- Shrijeet On Tue, Mar 29, 2016 at 3:42 PM, Ratha v <vijayara...@gmail.com> wrote: > Hi oleg; > > 1. The consumer shell (The default tool :./kafka-console-consumer.sh > --zookeeper xxx:2181 --topic yy > ) consumes my messages. Only my programmed listener is not consuming ..I > provided the code in the previous thread. > > > Here is my producer (I believe this works, because, the* kafka consumer.sh > * > consumes) > > > > > import java.io.FileNotFoundException; > import java.util.Properties; > > import com.xx.core.impl.KafkaConfigurationLoader; > import com.xx.core.model.base.RawFile; > > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerRecord; > > /** > * Class to produce some test messages to the Kafka server > * > * @author ratha > * > */ > public class KafkaMessageProducer { > private Properties properties; > private String topic; > private File file; > private KafkaProducer<String, File> producer; > > public KafkaMessageProducer(String topic, File file) { > this.topic = topic; > this.file = file; > > KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader(); > try { > properties = confLoader.loadProducerConfig(); > > producer = new KafkaProducer<>(properties); > } catch (FileNotFoundException e) { > e.printStackTrace(); > } > } > > public void generateMessgaes() { > try { > for(int i=0; i<10;i++){ > producer.send(new ProducerRecord<String, File>(topic, file)); > } > } catch (Exception e) { > e.printStackTrace(); > System.out.println("Error in publishing messages to the topic : " + topic); > > } finally { > producer.close(); > } > } > > } > > Thanks > > > On 30 March 2016 at 09:31, Oleg Zhurakousky <ozhurakou...@hortonworks.com> > wrote: > > > Ratha > > > > " I published messages and started the listener ---> No success”. > > > > I would stop right here as you already see an issue. So I am assuming the > > listener you started is console listener and with that I am going to > assume > > it works providing message is on the topic. And since nothing happened > then > > you probably have issue in producer. > > > > Could you post producer code. > > > > Oleg > > > > > On Mar 29, 2016, at 6:24 PM, Ratha v <vijayara...@gmail.com> wrote: > > > > > > Hi Oleg; > > > > > > Thanks for the guide..Here is my troubleshooting steps; > > > > > > > > > 1. I published messages and started the listener ---> No success > > > 2. I set the "auto.offset.reset" property to *earliest --> No > success* > > > 3. *Set the "consumer.timeout" to 1 --> Nothing happens* > > > 4. *T*hought may be my messages are not published, so started the > > > consumer shell script (default tool in the kafka distribution), that > > > consumes my messages well. > > > 5. Changed hasnext() to while (true) --> no change in the behaviour > > > > > > > > > Im really confused..:( > > > > > > Here is the code; > > > > > > *executor* > > > > > > public void start() { > > > > > > List<String> topics = Arrays.asList(topic); > > > > > > ExecutorService executor = Executors.newFixedThreadPool(CoreConstants. > > > THREAD_SIZE); > > > > > > ListenerThread lThread = new ListenerThread(topics, properties); > > > > > > executor.submit(lThread); > > > > > > > > > Runtime.getRuntime().addShutdownHook(new Thread() { > > > > > > @Override > > > > > > public void run() { > > > > > > lThread.shutdown(); > > > > > > executor.shutdown(); > > > > > > try { > > > > > > executor.awaitTermination(5000, TimeUnit.MILLISECONDS); > > > > > > } catch (InterruptedException e) { > > > > > > e.printStackTrace(); > > > > > > } > > > > > > } > > > > > > }); > > > > > > } > > > > > > > > > > > > *Thread* > > > > > > > > > > > > public void run() { > > > > > > try { > > > > > > consumer.subscribe(topics); > > > > > > while (true) { > > > > > > ConsumerRecords<String, File> records = consumer.poll(100); > > > > > > System.out.println("&&&&&&2222 : "+records.count()); > > > > > > for (ConsumerRecord<String, RawFile> record : records) { > > > > > > System.out.println("&&&&&&333"); > > > > > > FileProcessor processor = new FileProcessor(); > > > > > > processor.processFile(record.value()); > > > > > > > > > System.out.println("&&&&&&"+record.value()); > > > > > > } > > > > > > } > > > > > > > > > } catch (Throwable e) { > > > > > > e.printStackTrace(); > > > > > > System.out.println("eror in polling"); > > > > > > // ignore for shutdown > > > > > > } finally { > > > > > > consumer.close(); > > > > > > } > > > > > > > > > > > > Do you think any other issues from my end? > > > > > > Thanks > > > > > > > > > On 29 March 2016 at 23:13, Oleg Zhurakousky < > > ozhurakou...@hortonworks.com> > > > wrote: > > > > > >> Ratha > > >> > > >> It appears you have couple of issues here, so I’ll start with the > > consumer > > >> first. > > >> If you do a search on this mailing list on “Consumer deadlock” in the > > >> subject you’ll find a thread where similar symptoms were discussed. > > >> Basically the hasNext() method you mentioned is implemented as a > > blocking > > >> call and while we may all have opinion about that decision and why > > Iterator > > >> was chosen in the first place, it is what it is. But from what I > > understand > > >> it simply means that there are no messages to poll from the topic > (yes I > > >> know, hasNext()=false seems natural here but. . .). What you can do is > > set ‘ > > >> consumer.timeout.ms’ property to value such as ‘1’. By doing so you > are > > >> stating that you are willing to block for no longer then 1 > millisecond. > > >> But you also mention that you are sending message to the topic and > > >> therefore have reasonable expectation to poll something from it but > yet > > >> you’re blocking. That is strange indeed. What I would suggest is to > try > > one > > >> thing at the time. Use your Java producer, in conjunction with console > > >> consumer. This will help to narrow down the problem (e.g., issue with > > >> producer that may not be actually sending). Then hopefully if you > > receive > > >> successfully then you know the problem is in the consumer and so on. > > >> > > >> Cheers > > >> Oleg > > >> > > >> On Mar 29, 2016, at 12:48 AM, Ratha v <vijayara...@gmail.com<mailto: > > >> vijayara...@gmail.com>> wrote: > > >> > > >> Hi all; > > >> I publish a java object and try to consume it.. > > >> I have a poll method to consume objects, but it never returns any > > >> objects..My program runs forever.(?) > > >> > > >> *ConsumerThread* > > >> > > >> public void run() { > > >> > > >> try { > > >> > > >> consumer.subscribe(topics); > > >> > > >> > > >> Iterator<ConsumerRecord<String, File>> it = > > consumer.poll(1000).iterator(); > > >> > > >> while (it.hasNext()) { > > >> > > >> ConsumerRecord<String, File> record = it.next(); > > >> > > >> FileProcessor processor = new FileProcessor(); > > >> > > >> processor.processFile(record.value()); > > >> > > >> System.out.println("-----" + ": " + record); > > >> > > >> > > >> > > >> When i debug it never goes inside the while loop. Why is that? > > >> > > >> I publish object like this; > > >> > > >> producer.send(new ProducerRecord<String, File>(topic, file)); > > >> > > >> > > >> Thanks > > >> -- > > >> -Ratha > > >> http://vvratha.blogspot.com/ > > >> > > >> > > > > > > > > > -- > > > -Ratha > > > http://vvratha.blogspot.com/ > > > > > > > -- > -Ratha > http://vvratha.blogspot.com/ >