[ 
https://issues.apache.org/jira/browse/KAFKA-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aarti gupta updated KAFKA-1632:
-------------------------------
    Description: 
\The following error is thrown, (when I call KafkaStream.head(), as shown in 
the code snippet below)

 WARN -  java.lang.NoSuchMethodError: 
kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;

My use case, is that I want to block on the receive() method, and when a 
message is published on the topic, I 'return the head' of the queue to the 
calling method, that processes it.

I do not use partitioning and have a single stream.


import com.google.common.collect.Maps;
import x.x.x.Task;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * @author agupta
 */
public class KafkaConsumerDelegate implements ConsumerDelegate {

    private ConsumerConnector consumerConnector;
    private String topicName;
    private static Logger LOG = 
LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
    private final Map<String, Integer> topicCount = Maps.newHashMap();
    private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
    private List<KafkaStream<byte[], byte[]>> kafkaStreams;


    @Override
    public Task receive(final boolean consumerConfirms) {
        try {
            LOG.info("Kafka consumer delegate listening on topic " + 
getTopicName());
            kafkaStreams = messageStreams.get(getTopicName());
            final KafkaStream<byte[], byte[]> kafkaStream = kafkaStreams.get(0);
            return Executors.newSingleThreadExecutor().submit(new 
Callable<Task>() {
                @Override
                public Task call() throws Exception {
                     final MessageAndMetadata<byte[], byte[]> 
messageAndMetadata= kafkaStream.head();

                        final Task message = new Task() {
                            @Override
                            public byte[] getBytes() {
                                return messageAndMetadata.message();

                            }
                        };
                        return message;
                    }
                }).get();

        } catch (Exception e) {
            LOG.warn("Error in consumer " + e.getMessage());
        }
        return null;
    }

    @Override
    public void initialize(JSONObject configData, boolean publisherAckMode) 
throws IOException {
        try {
            this.topicName = configData.getString("topicName");
            LOG.info("Topic name is " + topicName);
        } catch (JSONException e) {
            e.printStackTrace();
            LOG.error("Error parsing configuration", e);
        }
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "testgroup");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        //only one stream, and one topic, (Since we are not supporting 
partitioning)
        topicCount.put(getTopicName(), 1);
        consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
        messageStreams = consumerConnector.createMessageStreams(topicCount);
    }

    @Override
    public void stop() throws IOException {
//TODO
        throw new UnsupportedOperationException("Method Not Implemented");
    }

    public String getTopicName() {
        return this.topicName;
    }
}



Lastly, I am using the following binary 

kafka_2.8.0-0.8.1.1  

and the following maven dependency

  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
        </dependency>


Any suggestions?

Thanks
aarti

  was:
\The following error is thrown, (when I call KafkaStream.head(), as shown in 
the code snippet below)

 WARN -  java.lang.NoSuchMethodError: 
kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;

My use case, is that I want to block on the receive() method, and when a 
message is published on the topic, I 'head' of the queue to the calling method, 
that processes it.

I do not use partitioning and have a single stream.


import com.google.common.collect.Maps;
import x.x.x.Task;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * @author agupta
 */
public class KafkaConsumerDelegate implements ConsumerDelegate {

    private ConsumerConnector consumerConnector;
    private String topicName;
    private static Logger LOG = 
LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
    private final Map<String, Integer> topicCount = Maps.newHashMap();
    private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
    private List<KafkaStream<byte[], byte[]>> kafkaStreams;


    @Override
    public Task receive(final boolean consumerConfirms) {
        try {
            LOG.info("Kafka consumer delegate listening on topic " + 
getTopicName());
            kafkaStreams = messageStreams.get(getTopicName());
            final KafkaStream<byte[], byte[]> kafkaStream = kafkaStreams.get(0);
            return Executors.newSingleThreadExecutor().submit(new 
Callable<Task>() {
                @Override
                public Task call() throws Exception {
                     final MessageAndMetadata<byte[], byte[]> 
messageAndMetadata= kafkaStream.head();

                        final Task message = new Task() {
                            @Override
                            public byte[] getBytes() {
                                return messageAndMetadata.message();

                            }
                        };
                        return message;
                    }
                }).get();

        } catch (Exception e) {
            LOG.warn("Error in consumer " + e.getMessage());
        }
        return null;
    }

    @Override
    public void initialize(JSONObject configData, boolean publisherAckMode) 
throws IOException {
        try {
            this.topicName = configData.getString("topicName");
            LOG.info("Topic name is " + topicName);
        } catch (JSONException e) {
            e.printStackTrace();
            LOG.error("Error parsing configuration", e);
        }
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "testgroup");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        //only one stream, and one topic, (Since we are not supporting 
partitioning)
        topicCount.put(getTopicName(), 1);
        consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
        messageStreams = consumerConnector.createMessageStreams(topicCount);
    }

    @Override
    public void stop() throws IOException {
//TODO
        throw new UnsupportedOperationException("Method Not Implemented");
    }

    public String getTopicName() {
        return this.topicName;
    }
}



Lastly, I am using the following binary 

kafka_2.8.0-0.8.1.1  

and the following maven dependency

  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
        </dependency>


Any suggestions?

Thanks
aarti


> No such method error on KafkaStream.head
> ----------------------------------------
>
>                 Key: KAFKA-1632
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1632
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.0
>            Reporter: aarti gupta
>
> \The following error is thrown, (when I call KafkaStream.head(), as shown in 
> the code snippet below)
>  WARN -  java.lang.NoSuchMethodError: 
> kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;
> My use case, is that I want to block on the receive() method, and when a 
> message is published on the topic, I 'return the head' of the queue to the 
> calling method, that processes it.
> I do not use partitioning and have a single stream.
> import com.google.common.collect.Maps;
> import x.x.x.Task;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.consumer.ZookeeperConsumerConnector;
> import kafka.message.MessageAndMetadata;
> import org.codehaus.jettison.json.JSONException;
> import org.codehaus.jettison.json.JSONObject;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.io.IOException;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.Callable;
> import java.util.concurrent.Executors;
> /**
>  * @author agupta
>  */
> public class KafkaConsumerDelegate implements ConsumerDelegate {
>     private ConsumerConnector consumerConnector;
>     private String topicName;
>     private static Logger LOG = 
> LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
>     private final Map<String, Integer> topicCount = Maps.newHashMap();
>     private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
>     private List<KafkaStream<byte[], byte[]>> kafkaStreams;
>     @Override
>     public Task receive(final boolean consumerConfirms) {
>         try {
>             LOG.info("Kafka consumer delegate listening on topic " + 
> getTopicName());
>             kafkaStreams = messageStreams.get(getTopicName());
>             final KafkaStream<byte[], byte[]> kafkaStream = 
> kafkaStreams.get(0);
>             return Executors.newSingleThreadExecutor().submit(new 
> Callable<Task>() {
>                 @Override
>                 public Task call() throws Exception {
>                      final MessageAndMetadata<byte[], byte[]> 
> messageAndMetadata= kafkaStream.head();
>                         final Task message = new Task() {
>                             @Override
>                             public byte[] getBytes() {
>                                 return messageAndMetadata.message();
>                             }
>                         };
>                         return message;
>                     }
>                 }).get();
>         } catch (Exception e) {
>             LOG.warn("Error in consumer " + e.getMessage());
>         }
>         return null;
>     }
>     @Override
>     public void initialize(JSONObject configData, boolean publisherAckMode) 
> throws IOException {
>         try {
>             this.topicName = configData.getString("topicName");
>             LOG.info("Topic name is " + topicName);
>         } catch (JSONException e) {
>             e.printStackTrace();
>             LOG.error("Error parsing configuration", e);
>         }
>         Properties properties = new Properties();
>         properties.put("zookeeper.connect", "localhost:2181");
>         properties.put("group.id", "testgroup");
>         ConsumerConfig consumerConfig = new ConsumerConfig(properties);
>         //only one stream, and one topic, (Since we are not supporting 
> partitioning)
>         topicCount.put(getTopicName(), 1);
>         consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
>         messageStreams = consumerConnector.createMessageStreams(topicCount);
>     }
>     @Override
>     public void stop() throws IOException {
> //TODO
>         throw new UnsupportedOperationException("Method Not Implemented");
>     }
>     public String getTopicName() {
>         return this.topicName;
>     }
> }
> Lastly, I am using the following binary 
> kafka_2.8.0-0.8.1.1  
> and the following maven dependency
>   <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.10</artifactId>
>             <version>0.8.1.1</version>
>         </dependency>
> Any suggestions?
> Thanks
> aarti



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to