Can you share complete Consumer configuration? Also what does stack-trace
show,  is it stuck in poll?

I think I have seen this before in one of my tests, in my case I had pinned
it to side effect of https://issues.apache.org/jira/browse/KAFKA-1894.
Essentially poll(timeout) doesn't always respect the timeout :-|

--
Shrijeet

On Tue, Mar 29, 2016 at 3:42 PM, Ratha v <vijayara...@gmail.com> wrote:

> Hi oleg;
>
>    1. The consumer shell  (The default tool :./kafka-console-consumer.sh
>    --zookeeper xxx:2181 --topic yy
>    ) consumes my messages. Only my programmed listener is not consuming ..I
>    provided the code in the previous thread.
>
>
> Here is my producer (I believe this works, because, the* kafka consumer.sh
> *
>  consumes)
>
>
>
>
> import java.io.FileNotFoundException;
> import java.util.Properties;
>
> import com.xx.core.impl.KafkaConfigurationLoader;
> import com.xx.core.model.base.RawFile;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> /**
>  * Class to produce some test messages to the Kafka server
>  *
>  * @author ratha
>  *
>  */
> public class KafkaMessageProducer {
> private Properties properties;
> private String topic;
> private File file;
> private KafkaProducer<String, File> producer;
>
> public KafkaMessageProducer(String topic, File file) {
> this.topic = topic;
> this.file = file;
>
> KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
> try {
> properties = confLoader.loadProducerConfig();
>
> producer = new KafkaProducer<>(properties);
> } catch (FileNotFoundException e) {
> e.printStackTrace();
> }
> }
>
> public void generateMessgaes() {
> try {
> for(int i=0; i<10;i++){
> producer.send(new ProducerRecord<String, File>(topic, file));
> }
> } catch (Exception e) {
> e.printStackTrace();
> System.out.println("Error in publishing messages to the topic : " + topic);
>
> } finally {
> producer.close();
> }
> }
>
> }
>
> Thanks
>
>
> On 30 March 2016 at 09:31, Oleg Zhurakousky <ozhurakou...@hortonworks.com>
> wrote:
>
> > Ratha
> >
> > " I published messages and started the listener ---> No success”.
> >
> > I would stop right here as you already see an issue. So I am assuming the
> > listener you started is console listener and with that I am going to
> assume
> > it works providing message is on the topic. And since nothing happened
> then
> > you probably have issue in producer.
> >
> > Could you post producer code.
> >
> > Oleg
> >
> > > On Mar 29, 2016, at 6:24 PM, Ratha v <vijayara...@gmail.com> wrote:
> > >
> > > Hi Oleg;
> > >
> > > Thanks for the guide..Here is my troubleshooting steps;
> > >
> > >
> > >   1. I published messages and started the listener ---> No success
> > >   2. I set the "auto.offset.reset" property to *earliest  --> No
> success*
> > >   3. *Set the "consumer.timeout" to 1 --> Nothing happens*
> > >   4. *T*hought may be my messages are not published, so started the
> > >   consumer shell script (default tool in the kafka distribution), that
> > >   consumes my messages well.
> > >   5. Changed hasnext() to while (true)  --> no change in the behaviour
> > >
> > >
> > > Im really confused..:(
> > >
> > > Here is the code;
> > >
> > > *executor*
> > >
> > > public void start() {
> > >
> > > List<String> topics = Arrays.asList(topic);
> > >
> > > ExecutorService executor = Executors.newFixedThreadPool(CoreConstants.
> > > THREAD_SIZE);
> > >
> > > ListenerThread lThread = new ListenerThread(topics, properties);
> > >
> > > executor.submit(lThread);
> > >
> > >
> > > Runtime.getRuntime().addShutdownHook(new Thread() {
> > >
> > > @Override
> > >
> > > public void run() {
> > >
> > > lThread.shutdown();
> > >
> > > executor.shutdown();
> > >
> > > try {
> > >
> > > executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
> > >
> > > } catch (InterruptedException e) {
> > >
> > > e.printStackTrace();
> > >
> > > }
> > >
> > > }
> > >
> > > });
> > >
> > > }
> > >
> > >
> > >
> > > *Thread*
> > >
> > >
> > >
> > > public void run() {
> > >
> > > try {
> > >
> > > consumer.subscribe(topics);
> > >
> > > while (true) {
> > >
> > > ConsumerRecords<String, File> records = consumer.poll(100);
> > >
> > > System.out.println("&&&&&&2222 : "+records.count());
> > >
> > > for (ConsumerRecord<String, RawFile> record : records) {
> > >
> > > System.out.println("&&&&&&333");
> > >
> > > FileProcessor processor = new FileProcessor();
> > >
> > > processor.processFile(record.value());
> > >
> > >
> > > System.out.println("&&&&&&"+record.value());
> > >
> > > }
> > >
> > > }
> > >
> > >
> > > } catch (Throwable e) {
> > >
> > > e.printStackTrace();
> > >
> > > System.out.println("eror in polling");
> > >
> > > // ignore for shutdown
> > >
> > > } finally {
> > >
> > > consumer.close();
> > >
> > > }
> > >
> > >
> > >
> > > Do you think any other issues from my end?
> > >
> > > Thanks
> > >
> > >
> > > On 29 March 2016 at 23:13, Oleg Zhurakousky <
> > ozhurakou...@hortonworks.com>
> > > wrote:
> > >
> > >> Ratha
> > >>
> > >> It appears you have couple of issues here, so I’ll start with the
> > consumer
> > >> first.
> > >> If you do a search on this mailing list on “Consumer deadlock” in the
> > >> subject you’ll find a thread where similar symptoms were discussed.
> > >> Basically the hasNext() method you mentioned is implemented as a
> > blocking
> > >> call and while we may all have opinion about that decision and why
> > Iterator
> > >> was chosen in the first place, it is what it is. But from what I
> > understand
> > >> it simply means that there are no messages to poll from the topic
> (yes I
> > >> know, hasNext()=false seems natural here but. . .). What you can do is
> > set ‘
> > >> consumer.timeout.ms’ property to value such as ‘1’. By doing so you
> are
> > >> stating that you are willing to block for no longer then 1
> millisecond.
> > >> But you also mention that you are sending message to the topic and
> > >> therefore have reasonable expectation to poll something from it but
> yet
> > >> you’re blocking. That is strange indeed. What I would suggest is to
> try
> > one
> > >> thing at the time. Use your Java producer, in conjunction with console
> > >> consumer. This will help to narrow down the problem (e.g., issue with
> > >> producer that may not be actually sending). Then hopefully if you
> > receive
> > >> successfully then you know the problem is in the consumer and so on.
> > >>
> > >> Cheers
> > >> Oleg
> > >>
> > >> On Mar 29, 2016, at 12:48 AM, Ratha v <vijayara...@gmail.com<mailto:
> > >> vijayara...@gmail.com>> wrote:
> > >>
> > >> Hi all;
> > >> I publish a java object and try to consume it..
> > >> I have a poll method to consume objects, but it never returns any
> > >> objects..My program runs forever.(?)
> > >>
> > >> *ConsumerThread*
> > >>
> > >> public void run() {
> > >>
> > >> try {
> > >>
> > >> consumer.subscribe(topics);
> > >>
> > >>
> > >> Iterator<ConsumerRecord<String, File>> it =
> > consumer.poll(1000).iterator();
> > >>
> > >> while (it.hasNext()) {
> > >>
> > >> ConsumerRecord<String, File> record = it.next();
> > >>
> > >> FileProcessor processor = new FileProcessor();
> > >>
> > >> processor.processFile(record.value());
> > >>
> > >> System.out.println("-----" + ": " + record);
> > >>
> > >>
> > >>
> > >> When i debug it never goes inside the while loop. Why is that?
> > >>
> > >> I publish object like this;
> > >>
> > >> producer.send(new ProducerRecord<String, File>(topic, file));
> > >>
> > >>
> > >> Thanks
> > >> --
> > >> -Ratha
> > >> http://vvratha.blogspot.com/
> > >>
> > >>
> > >
> > >
> > > --
> > > -Ratha
> > > http://vvratha.blogspot.com/
> >
> >
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>

Reply via email to