Hi Shrijeet; I do not see any stacktarce. It stuck at poll.
Here is my consumer conf; zookeeper.connect=xx.com\:2181 metadata.broker.list=yy.com\:9092 enable.auto.commit=true auto.commit.interval.ms=101 max.partition.fetch.bytes=35 session.timeout.ms=3001 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=com.xx.process.RawFileSerializer heartbeat.interval.ms=1000 auto.offset.reset=earliest serializer.class=com.xx.process.FileSerializer bootstrap.servers=yy.com\:9092 group.id=test consumer.timeout.ms=1 Thanks. On 30 March 2016 at 10:10, Shrijeet Paliwal <shrijeet.pali...@gmail.com> wrote: > Can you share complete Consumer configuration? Also what does stack-trace > show, is it stuck in poll? > > I think I have seen this before in one of my tests, in my case I had pinned > it to side effect of https://issues.apache.org/jira/browse/KAFKA-1894. > Essentially poll(timeout) doesn't always respect the timeout :-| > > -- > Shrijeet > > On Tue, Mar 29, 2016 at 3:42 PM, Ratha v <vijayara...@gmail.com> wrote: > > > Hi oleg; > > > > 1. The consumer shell (The default tool :./kafka-console-consumer.sh > > --zookeeper xxx:2181 --topic yy > > ) consumes my messages. Only my programmed listener is not consuming > ..I > > provided the code in the previous thread. > > > > > > Here is my producer (I believe this works, because, the* kafka > consumer.sh > > * > > consumes) > > > > > > > > > > import java.io.FileNotFoundException; > > import java.util.Properties; > > > > import com.xx.core.impl.KafkaConfigurationLoader; > > import com.xx.core.model.base.RawFile; > > > > import org.apache.kafka.clients.producer.KafkaProducer; > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > /** > > * Class to produce some test messages to the Kafka server > > * > > * @author ratha > > * > > */ > > public class KafkaMessageProducer { > > private Properties properties; > > private String topic; > > private File file; > > private KafkaProducer<String, File> producer; > > > > public KafkaMessageProducer(String topic, File file) { > > this.topic = topic; > > this.file = file; > > > > KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader(); > > try { > > properties = confLoader.loadProducerConfig(); > > > > producer = new KafkaProducer<>(properties); > > } catch (FileNotFoundException e) { > > e.printStackTrace(); > > } > > } > > > > public void generateMessgaes() { > > try { > > for(int i=0; i<10;i++){ > > producer.send(new ProducerRecord<String, File>(topic, file)); > > } > > } catch (Exception e) { > > e.printStackTrace(); > > System.out.println("Error in publishing messages to the topic : " + > topic); > > > > } finally { > > producer.close(); > > } > > } > > > > } > > > > Thanks > > > > > > On 30 March 2016 at 09:31, Oleg Zhurakousky < > ozhurakou...@hortonworks.com> > > wrote: > > > > > Ratha > > > > > > " I published messages and started the listener ---> No success”. > > > > > > I would stop right here as you already see an issue. So I am assuming > the > > > listener you started is console listener and with that I am going to > > assume > > > it works providing message is on the topic. And since nothing happened > > then > > > you probably have issue in producer. > > > > > > Could you post producer code. > > > > > > Oleg > > > > > > > On Mar 29, 2016, at 6:24 PM, Ratha v <vijayara...@gmail.com> wrote: > > > > > > > > Hi Oleg; > > > > > > > > Thanks for the guide..Here is my troubleshooting steps; > > > > > > > > > > > > 1. I published messages and started the listener ---> No success > > > > 2. I set the "auto.offset.reset" property to *earliest --> No > > success* > > > > 3. *Set the "consumer.timeout" to 1 --> Nothing happens* > > > > 4. *T*hought may be my messages are not published, so started the > > > > consumer shell script (default tool in the kafka distribution), > that > > > > consumes my messages well. > > > > 5. Changed hasnext() to while (true) --> no change in the > behaviour > > > > > > > > > > > > Im really confused..:( > > > > > > > > Here is the code; > > > > > > > > *executor* > > > > > > > > public void start() { > > > > > > > > List<String> topics = Arrays.asList(topic); > > > > > > > > ExecutorService executor = > Executors.newFixedThreadPool(CoreConstants. > > > > THREAD_SIZE); > > > > > > > > ListenerThread lThread = new ListenerThread(topics, properties); > > > > > > > > executor.submit(lThread); > > > > > > > > > > > > Runtime.getRuntime().addShutdownHook(new Thread() { > > > > > > > > @Override > > > > > > > > public void run() { > > > > > > > > lThread.shutdown(); > > > > > > > > executor.shutdown(); > > > > > > > > try { > > > > > > > > executor.awaitTermination(5000, TimeUnit.MILLISECONDS); > > > > > > > > } catch (InterruptedException e) { > > > > > > > > e.printStackTrace(); > > > > > > > > } > > > > > > > > } > > > > > > > > }); > > > > > > > > } > > > > > > > > > > > > > > > > *Thread* > > > > > > > > > > > > > > > > public void run() { > > > > > > > > try { > > > > > > > > consumer.subscribe(topics); > > > > > > > > while (true) { > > > > > > > > ConsumerRecords<String, File> records = consumer.poll(100); > > > > > > > > System.out.println("&&&&&&2222 : "+records.count()); > > > > > > > > for (ConsumerRecord<String, RawFile> record : records) { > > > > > > > > System.out.println("&&&&&&333"); > > > > > > > > FileProcessor processor = new FileProcessor(); > > > > > > > > processor.processFile(record.value()); > > > > > > > > > > > > System.out.println("&&&&&&"+record.value()); > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > } catch (Throwable e) { > > > > > > > > e.printStackTrace(); > > > > > > > > System.out.println("eror in polling"); > > > > > > > > // ignore for shutdown > > > > > > > > } finally { > > > > > > > > consumer.close(); > > > > > > > > } > > > > > > > > > > > > > > > > Do you think any other issues from my end? > > > > > > > > Thanks > > > > > > > > > > > > On 29 March 2016 at 23:13, Oleg Zhurakousky < > > > ozhurakou...@hortonworks.com> > > > > wrote: > > > > > > > >> Ratha > > > >> > > > >> It appears you have couple of issues here, so I’ll start with the > > > consumer > > > >> first. > > > >> If you do a search on this mailing list on “Consumer deadlock” in > the > > > >> subject you’ll find a thread where similar symptoms were discussed. > > > >> Basically the hasNext() method you mentioned is implemented as a > > > blocking > > > >> call and while we may all have opinion about that decision and why > > > Iterator > > > >> was chosen in the first place, it is what it is. But from what I > > > understand > > > >> it simply means that there are no messages to poll from the topic > > (yes I > > > >> know, hasNext()=false seems natural here but. . .). What you can do > is > > > set ‘ > > > >> consumer.timeout.ms’ property to value such as ‘1’. By doing so you > > are > > > >> stating that you are willing to block for no longer then 1 > > millisecond. > > > >> But you also mention that you are sending message to the topic and > > > >> therefore have reasonable expectation to poll something from it but > > yet > > > >> you’re blocking. That is strange indeed. What I would suggest is to > > try > > > one > > > >> thing at the time. Use your Java producer, in conjunction with > console > > > >> consumer. This will help to narrow down the problem (e.g., issue > with > > > >> producer that may not be actually sending). Then hopefully if you > > > receive > > > >> successfully then you know the problem is in the consumer and so on. > > > >> > > > >> Cheers > > > >> Oleg > > > >> > > > >> On Mar 29, 2016, at 12:48 AM, Ratha v <vijayara...@gmail.com > <mailto: > > > >> vijayara...@gmail.com>> wrote: > > > >> > > > >> Hi all; > > > >> I publish a java object and try to consume it.. > > > >> I have a poll method to consume objects, but it never returns any > > > >> objects..My program runs forever.(?) > > > >> > > > >> *ConsumerThread* > > > >> > > > >> public void run() { > > > >> > > > >> try { > > > >> > > > >> consumer.subscribe(topics); > > > >> > > > >> > > > >> Iterator<ConsumerRecord<String, File>> it = > > > consumer.poll(1000).iterator(); > > > >> > > > >> while (it.hasNext()) { > > > >> > > > >> ConsumerRecord<String, File> record = it.next(); > > > >> > > > >> FileProcessor processor = new FileProcessor(); > > > >> > > > >> processor.processFile(record.value()); > > > >> > > > >> System.out.println("-----" + ": " + record); > > > >> > > > >> > > > >> > > > >> When i debug it never goes inside the while loop. Why is that? > > > >> > > > >> I publish object like this; > > > >> > > > >> producer.send(new ProducerRecord<String, File>(topic, file)); > > > >> > > > >> > > > >> Thanks > > > >> -- > > > >> -Ratha > > > >> http://vvratha.blogspot.com/ > > > >> > > > >> > > > > > > > > > > > > -- > > > > -Ratha > > > > http://vvratha.blogspot.com/ > > > > > > > > > > > > -- > > -Ratha > > http://vvratha.blogspot.com/ > > > -- -Ratha http://vvratha.blogspot.com/