[ 
https://issues.apache.org/jira/browse/KAFKA-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aarti gupta updated KAFKA-1632:
-------------------------------
    Description: 
\The following error is thrown, (when I call KafkaStream.head(), as shown in 
the code snippet below)

 WARN -  java.lang.NoSuchMethodError: 
kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;

My use case, is that I want to block on the receive() method, and when anything 
is published on the topic, I 'head' of the queue to the calling method, that 
processes it.

I do not use partitioning and have a single stream.


import com.google.common.collect.Maps;
import x.x.x.Task;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * @author agupta
 */
public class KafkaConsumerDelegate implements ConsumerDelegate {

    private ConsumerConnector consumerConnector;
    private String topicName;
    private static Logger LOG = 
LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
    private final Map<String, Integer> topicCount = Maps.newHashMap();
    private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
    private List<KafkaStream<byte[], byte[]>> kafkaStreams;


    @Override
    public Task receive(final boolean consumerConfirms) {
        try {
            LOG.info("Kafka consumer delegate listening on topic " + 
getTopicName());
            kafkaStreams = messageStreams.get(getTopicName());
            final KafkaStream<byte[], byte[]> kafkaStream = kafkaStreams.get(0);
            return Executors.newSingleThreadExecutor().submit(new 
Callable<Task>() {
                @Override
                public Task call() throws Exception {
                     final MessageAndMetadata<byte[], byte[]> 
messageAndMetadata= kafkaStream.head();

                        final Task message = new Task() {
                            @Override
                            public byte[] getBytes() {
                                return messageAndMetadata.message();

                            }
                        };
                        return message;
                    }
                }).get();

        } catch (Exception e) {
            LOG.warn("Error in consumer " + e.getMessage());
        }
        return null;
    }

    @Override
    public void initialize(JSONObject configData, boolean publisherAckMode) 
throws IOException {
        try {
            this.topicName = configData.getString("topicName");
            LOG.info("Topic name is " + topicName);
        } catch (JSONException e) {
            e.printStackTrace();
            LOG.error("Error parsing configuration", e);
        }
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "testgroup");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        //only one stream, and one topic, (Since we are not supporting 
partitioning)
        topicCount.put(getTopicName(), 1);
        consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
        messageStreams = consumerConnector.createMessageStreams(topicCount);
    }

    @Override
    public void stop() throws IOException {
//TODO
        throw new UnsupportedOperationException("Method Not Implemented");
    }

    public String getTopicName() {
        return this.topicName;
    }
}



Lastly, I am using the following binary 

kafka_2.8.0-0.8.1.1  

and the following maven dependency

  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
        </dependency>


Any suggestions?

Thanks
aarti

  was:
Hi Kafka team,

We use Kafka to send messages in an high volume/memory crazy application which 
uses Parallel GC. We send messages at the rate of 12500/min in the first few 
hours and then the number of messages drop down to 6000/min. Our application 
usually runs for a maximum of 24 hours

What we have:
1) When we do not send messages through Kafka Producer 0.8, then our 
application never slows down much and our entire process completes within 24 
hours
2) When we use Kafka, our machines slow down in sending messages to around 
2500/min and as time progresses, the number of messages being sent is even 
lesser
3) We suspect that our application spends more time in GC and hence the 
problem. The Heap Dump does not contain an leak suspect with Kafka, but this 
slowness happens only when Kafka messaging system is used.

Any pointers that could help us resolve this issue will be highly appreciated.


> No such method error on KafkaStream.head
> ----------------------------------------
>
>                 Key: KAFKA-1632
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1632
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.0
>            Reporter: aarti gupta
>            Priority: Critical
>
> \The following error is thrown, (when I call KafkaStream.head(), as shown in 
> the code snippet below)
>  WARN -  java.lang.NoSuchMethodError: 
> kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;
> My use case, is that I want to block on the receive() method, and when 
> anything is published on the topic, I 'head' of the queue to the calling 
> method, that processes it.
> I do not use partitioning and have a single stream.
> import com.google.common.collect.Maps;
> import x.x.x.Task;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.consumer.ZookeeperConsumerConnector;
> import kafka.message.MessageAndMetadata;
> import org.codehaus.jettison.json.JSONException;
> import org.codehaus.jettison.json.JSONObject;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.io.IOException;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.Callable;
> import java.util.concurrent.Executors;
> /**
>  * @author agupta
>  */
> public class KafkaConsumerDelegate implements ConsumerDelegate {
>     private ConsumerConnector consumerConnector;
>     private String topicName;
>     private static Logger LOG = 
> LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
>     private final Map<String, Integer> topicCount = Maps.newHashMap();
>     private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
>     private List<KafkaStream<byte[], byte[]>> kafkaStreams;
>     @Override
>     public Task receive(final boolean consumerConfirms) {
>         try {
>             LOG.info("Kafka consumer delegate listening on topic " + 
> getTopicName());
>             kafkaStreams = messageStreams.get(getTopicName());
>             final KafkaStream<byte[], byte[]> kafkaStream = 
> kafkaStreams.get(0);
>             return Executors.newSingleThreadExecutor().submit(new 
> Callable<Task>() {
>                 @Override
>                 public Task call() throws Exception {
>                      final MessageAndMetadata<byte[], byte[]> 
> messageAndMetadata= kafkaStream.head();
>                         final Task message = new Task() {
>                             @Override
>                             public byte[] getBytes() {
>                                 return messageAndMetadata.message();
>                             }
>                         };
>                         return message;
>                     }
>                 }).get();
>         } catch (Exception e) {
>             LOG.warn("Error in consumer " + e.getMessage());
>         }
>         return null;
>     }
>     @Override
>     public void initialize(JSONObject configData, boolean publisherAckMode) 
> throws IOException {
>         try {
>             this.topicName = configData.getString("topicName");
>             LOG.info("Topic name is " + topicName);
>         } catch (JSONException e) {
>             e.printStackTrace();
>             LOG.error("Error parsing configuration", e);
>         }
>         Properties properties = new Properties();
>         properties.put("zookeeper.connect", "localhost:2181");
>         properties.put("group.id", "testgroup");
>         ConsumerConfig consumerConfig = new ConsumerConfig(properties);
>         //only one stream, and one topic, (Since we are not supporting 
> partitioning)
>         topicCount.put(getTopicName(), 1);
>         consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
>         messageStreams = consumerConnector.createMessageStreams(topicCount);
>     }
>     @Override
>     public void stop() throws IOException {
> //TODO
>         throw new UnsupportedOperationException("Method Not Implemented");
>     }
>     public String getTopicName() {
>         return this.topicName;
>     }
> }
> Lastly, I am using the following binary 
> kafka_2.8.0-0.8.1.1  
> and the following maven dependency
>   <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.10</artifactId>
>             <version>0.8.1.1</version>
>         </dependency>
> Any suggestions?
> Thanks
> aarti



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to