Ratha

" I published messages and started the listener ---> No success”. 

I would stop right here as you already see an issue. So I am assuming the 
listener you started is console listener and with that I am going to assume it 
works providing message is on the topic. And since nothing happened then you 
probably have issue in producer.

Could you post producer code. 

Oleg

> On Mar 29, 2016, at 6:24 PM, Ratha v <vijayara...@gmail.com> wrote:
> 
> Hi Oleg;
> 
> Thanks for the guide..Here is my troubleshooting steps;
> 
> 
>   1. I published messages and started the listener ---> No success
>   2. I set the "auto.offset.reset" property to *earliest  --> No success*
>   3. *Set the "consumer.timeout" to 1 --> Nothing happens*
>   4. *T*hought may be my messages are not published, so started the
>   consumer shell script (default tool in the kafka distribution), that
>   consumes my messages well.
>   5. Changed hasnext() to while (true)  --> no change in the behaviour
> 
> 
> Im really confused..:(
> 
> Here is the code;
> 
> *executor*
> 
> public void start() {
> 
> List<String> topics = Arrays.asList(topic);
> 
> ExecutorService executor = Executors.newFixedThreadPool(CoreConstants.
> THREAD_SIZE);
> 
> ListenerThread lThread = new ListenerThread(topics, properties);
> 
> executor.submit(lThread);
> 
> 
> Runtime.getRuntime().addShutdownHook(new Thread() {
> 
> @Override
> 
> public void run() {
> 
> lThread.shutdown();
> 
> executor.shutdown();
> 
> try {
> 
> executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
> 
> } catch (InterruptedException e) {
> 
> e.printStackTrace();
> 
> }
> 
> }
> 
> });
> 
> }
> 
> 
> 
> *Thread*
> 
> 
> 
> public void run() {
> 
> try {
> 
> consumer.subscribe(topics);
> 
> while (true) {
> 
> ConsumerRecords<String, File> records = consumer.poll(100);
> 
> System.out.println("&&&&&&2222 : "+records.count());
> 
> for (ConsumerRecord<String, RawFile> record : records) {
> 
> System.out.println("&&&&&&333");
> 
> FileProcessor processor = new FileProcessor();
> 
> processor.processFile(record.value());
> 
> 
> System.out.println("&&&&&&"+record.value());
> 
> }
> 
> }
> 
> 
> } catch (Throwable e) {
> 
> e.printStackTrace();
> 
> System.out.println("eror in polling");
> 
> // ignore for shutdown
> 
> } finally {
> 
> consumer.close();
> 
> }
> 
> 
> 
> Do you think any other issues from my end?
> 
> Thanks
> 
> 
> On 29 March 2016 at 23:13, Oleg Zhurakousky <ozhurakou...@hortonworks.com>
> wrote:
> 
>> Ratha
>> 
>> It appears you have couple of issues here, so I’ll start with the consumer
>> first.
>> If you do a search on this mailing list on “Consumer deadlock” in the
>> subject you’ll find a thread where similar symptoms were discussed.
>> Basically the hasNext() method you mentioned is implemented as a blocking
>> call and while we may all have opinion about that decision and why Iterator
>> was chosen in the first place, it is what it is. But from what I understand
>> it simply means that there are no messages to poll from the topic (yes I
>> know, hasNext()=false seems natural here but. . .). What you can do is set ‘
>> consumer.timeout.ms’ property to value such as ‘1’. By doing so you are
>> stating that you are willing to block for no longer then 1 millisecond.
>> But you also mention that you are sending message to the topic and
>> therefore have reasonable expectation to poll something from it but yet
>> you’re blocking. That is strange indeed. What I would suggest is to try one
>> thing at the time. Use your Java producer, in conjunction with console
>> consumer. This will help to narrow down the problem (e.g., issue with
>> producer that may not be actually sending). Then hopefully if you receive
>> successfully then you know the problem is in the consumer and so on.
>> 
>> Cheers
>> Oleg
>> 
>> On Mar 29, 2016, at 12:48 AM, Ratha v <vijayara...@gmail.com<mailto:
>> vijayara...@gmail.com>> wrote:
>> 
>> Hi all;
>> I publish a java object and try to consume it..
>> I have a poll method to consume objects, but it never returns any
>> objects..My program runs forever.(?)
>> 
>> *ConsumerThread*
>> 
>> public void run() {
>> 
>> try {
>> 
>> consumer.subscribe(topics);
>> 
>> 
>> Iterator<ConsumerRecord<String, File>> it = consumer.poll(1000).iterator();
>> 
>> while (it.hasNext()) {
>> 
>> ConsumerRecord<String, File> record = it.next();
>> 
>> FileProcessor processor = new FileProcessor();
>> 
>> processor.processFile(record.value());
>> 
>> System.out.println("-----" + ": " + record);
>> 
>> 
>> 
>> When i debug it never goes inside the while loop. Why is that?
>> 
>> I publish object like this;
>> 
>> producer.send(new ProducerRecord<String, File>(topic, file));
>> 
>> 
>> Thanks
>> --
>> -Ratha
>> http://vvratha.blogspot.com/
>> 
>> 
> 
> 
> -- 
> -Ratha
> http://vvratha.blogspot.com/

Reply via email to