Hi, 

      We are evaluating kafka-0.8 for our product. We will start consumer for 
each partition. When i try to consume using High-Level API i could able to 
consume from kafka. But when i try to consume from kafka using Low-Level API, i 
am getting message size as 0. Am i missing some configuration? 




PS: I have attached the test code used for testing SimpleConsumer



Thanks in advance


Ranjith Venkatesan




package kafka.examples;


import java.util.logging.Level;
import java.util.logging.Logger;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;





//$Id$


public class SimpleKafkaConsumer implements Runnable
{
        private static final Logger LOGGER = 
Logger.getLogger(SimpleKafkaConsumer.class.getName());

        private static final int TIME_OUT = 100000;
        private static final int BUFFER_SIZE = 64 * 1024;
        private static final int FETCH_SIZE = 10000;


        private int partitionId;
        private int errorCount = 0;
        private Long readOffset;
        private Long processedOffset = 0l;
        private String leader;
        private String clientName;
        private String topic;

        private SimpleConsumer consumer = null;

        private SimpleKafkaConsumer(String topic, int partitionId, Long 
readOffset, String leader) throws Exception
        {
                this.topic = topic;
                this.partitionId = partitionId;
                this.readOffset = readOffset;
                this.leader = leader;

                //init
                clientName = KafkaConsumerUtil.formClientName(topic, 
partitionId);
        }


        /**
         * Fetches messages, writes to underlying Writer and schedules itself 
for
         * execution.
         */
        @Override
        public void run()
        {
                int i=0;
                while(true)
                {
                        try
                        {
                                //check leader status, if null get leader from 
kafka broker
                                try
                                {
                                        if (leader == null)
                                        {
                                                leader = 
KafkaConsumerUtil.findLeader(topic, partitionId, 5);
                                        }
                                }
                                catch (Exception e)
                                {
                                        LOGGER.log(Level.SEVERE, "Cannot find 
leader for topic " + topic + " of partition " + partitionId + ", will retry at 
next schedule,", e);
                                }

                                //leader cannot be found, try at next execution.
                                if (leader == null)
                                {
                                        return;
                                }
                                //check simpleconsumer status, if null create 
new simpleconsumer instance
                                if (consumer == null)
                                {
                                        LOGGER.log(Level.INFO, "Leader for {0} 
is {1}.", new Object[] { clientName, leader });
                                        consumer = new 
SimpleConsumer(leader.split(":")[0], Integer.parseInt(leader.split(":")[1]), 
TIME_OUT, BUFFER_SIZE, clientName);
                                }

                                //check read offset status, if null get 
earliest offset from kafka broker
                                if (readOffset == null)
                                {
                                        readOffset = 
KafkaConsumerUtil.getOffset(consumer, topic, partitionId, true, clientName);
                                }

                                //fetch message
                                ByteBufferMessageSet fetchedMessages = 
handleFetch(consumer, topic, partitionId, readOffset);
                                LOGGER.log(Level.INFO, "Reading from offset {0} 
for {1}.", new Object[] { readOffset, clientName });
                                //read fetched messages and write
                                if (fetchedMessages != null)
                                {
                                        LOGGER.log(Level.INFO, "Fetched message 
of size {0} bytes.", fetchedMessages.sizeInBytes());
                                        for (MessageAndOffset messageAndOffset 
: fetchedMessages)
                                        {
                                                long currentOffset = 
messageAndOffset.offset();
                                                if (currentOffset < readOffset)
                                                {
                                                        LOGGER.log(Level.INFO, 
"Found old offset {0}, but expecting {1} for Client {2}", new Object[] { 
currentOffset, readOffset, clientName });
                                                        continue;
                                                }
                                                try
                                                {
                                                        
handleMessage(messageAndOffset.message());
                                                }
                                                catch (Exception e)
                                                {
                                                        LOGGER.log(Level.INFO, 
"Exception while handling message,", e);
                                                }
                                                processedOffset = readOffset;
                                                readOffset = 
messageAndOffset.nextOffset();
                                        }
                                }
                                Thread.sleep(1000);
                        }
                        catch(Exception e)
                        {
                                e.printStackTrace();
                                close();
                        }
                        finally
                        {
                                if(i!=0 && i%10 ==0)
                                {
                                        close();
                                }
                        }
                        i++;
                        
                }
        }

        //kafka consumer related
        /**
         * Fetch data from <b>Topic</b> leader for given <b>Partition</b> from 
given
         * <b>Offset</b>.
         */
        private ByteBufferMessageSet handleFetch(SimpleConsumer consumer, 
String topic, int partitionId, long readOffset)
        {
                FetchRequest req = new 
FetchRequestBuilder().clientId(clientName).addFetch(topic, partitionId, 
readOffset, FETCH_SIZE).build();
                FetchResponse fetchResponse = consumer.fetch(req);
                if (fetchResponse.hasError())
                {
                        handleFetchError(fetchResponse.errorCode(topic, 
partitionId));
                        return null;
                }
                else
                {
                        return fetchResponse.messageSet(topic, partitionId);
                }
                
        }

        /**
         * Take action based on Error Code.
         */
        private void handleFetchError(short errorCode)
        {
                LOGGER.log(Level.INFO, "Error fetching data from the Broker {0} 
for topic {1} partitionId {2} with error code {3}", new Object[] { leader, 
topic, partitionId, errorCode });
                if (errorCode == ErrorMapping.OffsetOutOfRangeCode())
                {
                        readOffset = KafkaConsumerUtil.getOffset(consumer, 
topic, partitionId, true, clientName);
                }
                else if (errorCode == ErrorMapping.BrokerNotAvailableCode() || 
errorCode == ErrorMapping.LeaderNotAvailableCode() || errorCode == 
ErrorMapping.NotLeaderForPartitionCode())
                {
                        errorCount++;
                        if (errorCount > 5)
                        {
                                errorCount = 0;

                                consumer.close();
                                consumer = null;
                                leader = null;
                        }
                }
        }

        /**
         * Closes underlying Writer, commits Offset and {@link SimpleConsumer}
         * instance.
         */
        private void close()
        {
                try
                {
                        commitOffset(processedOffset);
                        if (consumer != null)
                        {
                                consumer.close();
                        }
                }
                catch(Exception e)
                {
                        
                }
        }

        //offset persist

        /**
         * Persist the given <b>Offset</b> to DB.
         */
        private void commitOffset(long offsetToCommit) throws Exception
        {
                //Update in DB
        }

        //message processing related
        /**
         * Obtain message key and payload from given {@link Message} object and
         * process it.
         */
        private void handleMessage(Message message) throws Exception
        {
                byte[] keyBytes = 
KafkaConsumerUtil.byteBufferToByteArray(message.key());
                byte[] msgBytes = 
KafkaConsumerUtil.byteBufferToByteArray(message.payload());
                System.out.println(msgBytes);
                //processMessage(topic, new JSONObject(new String(keyBytes)), 
msgBytes);
        }

        /**
         * Parse message key, process data based on <b>Log Type</b> obtained 
from
         * message key, writes to underlying Writer and commits Offset when the
         * writer Rotates its files.
         */
        public void processMessage(String topic, byte[] msg) throws Exception
        {

        }

        
        @Override
        public String toString()
        {
                StringBuilder sb = new StringBuilder();
                sb.append('{');//No I18N
                
sb.append('"').append("Topic").append('"').append(':').append('"').append(topic).append('"').append(',');//No
 I18N
                
sb.append('"').append("PartitionId").append('"').append(':').append('"').append(partitionId).append('"').append(',');//No
 I18N
                sb.append('"').append("Client 
Name").append('"').append(':').append(clientName);//No I18N
                sb.append('}');//No I18N
                return sb.toString();
        }

public static void main(String[] args)
{
        try
        {
                SimpleKafkaConsumer consumer = new 
SimpleKafkaConsumer("TOPIC1", 1, null, null);
                Thread t = new Thread(consumer);
                t.start();
        }
        catch(Exception e)
        {
                e.printStackTrace();
        }
}
        
}
package kafka.examples;

//$Id: $

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;




public class KafkaConsumerUtil
{
        private static final Logger LOGGER = 
Logger.getLogger(KafkaConsumerUtil.class.getName());

        /**
         * Gets {@link PartitionMetadata} for given <b>Topic</b> and
         * <b>PartitionId</b> list from Kafka Broker using
         * {@link TopicMetadataRequest}.
         * 
         * @param topicPartitionDetail
         *            A {@link Map} with key as <b>Topic</b> and value as
         *            {@link Integer} {@link List} of <b>PartitionId</b>.
         */
        public static Map<String, ArrayList<PartitionMetadata>> 
getTopicPartitionMeta(Map<String, ArrayList<Integer>> topicPartitionDetail) 
throws Exception
        {
                List<String> brokerList = new ArrayList<String>();

                brokerList.add("search360-ubuntu12:9092");
                brokerList.add("search360-ubuntu12:9091");
                brokerList.add("search360-ubuntu12:9090");
                
                List<String> topicList = new 
ArrayList<String>(topicPartitionDetail.keySet());

                Map<String, ArrayList<PartitionMetadata>> 
returnTopicPartitionMeta = new HashMap<String, ArrayList<PartitionMetadata>>();

                for (String brokerIP : brokerList)
                {
                        SimpleConsumer consumer = null;
                        String ip = brokerIP.split(":")[0];
                        
                        int port = Integer.parseInt(brokerIP.split(":")[1]);

                        try
                        {
                                consumer = new SimpleConsumer(ip, port, 100000, 
64 * 1024, "leaderLookup");//No I18N

                                TopicMetadataRequest req = new 
TopicMetadataRequest(topicList);
                                kafka.javaapi.TopicMetadataResponse resp = 
consumer.send(req);
                                List<TopicMetadata> metaData = 
resp.topicsMetadata();

                                for (TopicMetadata topicMeta : metaData)
                                {
                                        String topic = topicMeta.topic();
                                        if 
(!topicPartitionDetail.containsKey(topic))
                                        {
                                                continue;
                                        }
                                        List<Integer> partitionList = 
topicPartitionDetail.get(topic);
                                        int count = partitionList.size();

                                        for (PartitionMetadata part : 
topicMeta.partitionsMetadata())
                                        {
                                                if (count == 0)
                                                {
                                                        break;
                                                }

                                                Integer partId = 
part.partitionId();

                                                if 
(partitionList.contains(partId))
                                                {
                                                        if 
(!returnTopicPartitionMeta.containsKey(topic))
                                                        {
                                                                
returnTopicPartitionMeta.put(topic, new ArrayList<PartitionMetadata>());
                                                        }
                                                        
returnTopicPartitionMeta.get(topic).add(part);
                                                        count--;
                                                }
                                        }
                                }
                        }
                        catch (Exception e)
                        {
                                LOGGER.log(Level.INFO, "Error communicating 
with Broker " + brokerIP + " to find Leader for " + topicPartitionDetail, e);
                        }
                        finally
                        {
                                if (consumer != null)
                                {
                                        consumer.close();
                                }
                        }
                }
                return returnTopicPartitionMeta;
        }

        /**
         * Gets {@link PartitionMetadata} for given <b>Topic</b> and
         * <b>PartitionId</b> from Kafka Broker using {@link 
TopicMetadataRequest}.
         */
        public static PartitionMetadata getTopicPartitionMeta(String topic, int 
partitionId) throws Exception
        {
                Map<String, ArrayList<Integer>> map = new HashMap<String, 
ArrayList<Integer>>(1);
                ArrayList<Integer> list = new ArrayList<Integer>(1);
                list.add(partitionId);
                map.put(topic, list);
                Map<String, ArrayList<PartitionMetadata>> retMap = 
getTopicPartitionMeta(map);
                if (retMap.containsKey(topic))
                {
                        return retMap.get(topic).get(0);
                }
                else
                {
                        return null;
                }
        }

        /**
         * Find <b>Leader</b> for given <b>Topic</b> and <b>PartitionId</b>.
         */
        public static String findLeader(String topic, int partitionId, int 
maxRetry) throws Exception
        {
                return findNewLeader("", topic, partitionId, maxRetry);
        }

        /**
         * Find new <b>Leader</b> for given <b>Topic</b> and <b>PartitionId</b>.
         */
        public static String findNewLeader(String oldLeader, String topic, int 
partitionId, int maxRetry) throws Exception
        {
                int i = 0;
                long sleep = 0l;
                boolean goToSleep;

                while (maxRetry < 0 || i < maxRetry)
                {
                        goToSleep = false;
                        PartitionMetadata metadata = 
getTopicPartitionMeta(topic, partitionId);
                        if (metadata == null || metadata.leader() == null)
                        {
                                goToSleep = true;
                        }
                        else if 
(oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0)
                        {
                                // first time through if the leader hasn't 
changed give ZooKeeper a second to recover
                                // second time, assume the broker did recover 
before failover, or it was a non-Broker issue
                                //
                                goToSleep = true;
                        }
                        else
                        {
                                return 
metadata.leader().host()+":"+metadata.leader().port();
                                
                        }
                        if (goToSleep)
                        {
                                sleep = 1000;

                                LOGGER.log(Level.INFO, "Cannot find leader for 
topic {0} partitionId {1} in retry {2}, retrying after {3} millis.", new 
Object[] { topic, partitionId, i, sleep });

                                try
                                {
                                        Thread.sleep(sleep);
                                }
                                catch (InterruptedException ie)
                                {
                                }
                        }
                }
                if (oldLeader.equalsIgnoreCase(""))
                {
                        throw new Exception("Unable to find New leader after 
Broker failure.");
                }
                else
                {
                        throw new Exception("Unable to find leader.");
                }
        }

        /**
         * Get <b>Offset</b> from given <b>Topic</b> and <b>PartitionId</b> 
using
         * {@link kafka.javaapi.OffsetRequest}.
         * 
         * @param earliestTime
         *            <b>True</b> to get Earliest Offset, <b>False</b> to get 
Latest
         *            Offset.
         */
        public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, boolean earliestTime, String clientName)
        {
                long whichTime;
                if (earliestTime)
                {
                        whichTime = kafka.api.OffsetRequest.EarliestTime();
                }
                else
                {
                        whichTime = kafka.api.OffsetRequest.LatestTime();
                }
                TopicAndPartition topicAndPartition = new 
TopicAndPartition(topic, partition);
                Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo 
= new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
                requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(whichTime, 1));
                kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), clientName);
                OffsetResponse response = consumer.getOffsetsBefore(request);

                if (response.hasError())
                {
                        LOGGER.log(Level.INFO, "Error fetching Offset Data from 
Broker with Error Code {0}.", response.errorCode(topic, partition));
                        return 0l;
                }
                long[] offsets = response.offsets(topic, partition);
                return offsets[0];
        }

        /**
         * Convert {@link Byte}[] from {@link ByteBuffer}.
         */
        public static byte[] byteBufferToByteArray(ByteBuffer bb)
        {
                byte[] returnBytes = new byte[bb.limit()];
                bb.get(returnBytes);
                return returnBytes;
        }

        /**
         * Schedules itself for execution after given time.
         */
        public static void scheduleSleep(ScheduledThreadPoolExecutor executor, 
Runnable runnable, long delay, TimeUnit unit)
        {
                LOGGER.log(Level.INFO, "Schedule sleep for {0} {1} for thread 
{2}.", new Object[] { delay, unit, getThreadName() });
                executor.schedule(runnable, delay, unit);
        }

        /**
         * Form Client Name used for communication with Kafka Brokers.
         */
        public static String formClientName(String topic, int partitionId)
        {
                String myIp;
                try
                {
                        myIp = InetAddress.getLocalHost().getHostAddress();
                }
                catch (Exception e)
                {
                        LOGGER.log(Level.INFO, "Exception while getting my ip, 
so using \'Client\' instead of ip", e);
                        myIp = "Client";//No I18N
                }
                return myIp + "_" + topic + "_" + partitionId;
        }

        /**
         * Returns current Threads Name.
         */
        public static String getThreadName()
        {
                return Thread.currentThread().getName();
        }

        /**
         * Sets given threadName as current Threads Name.
         */
        public static void setThreadName(String threadName)
        {
                Thread.currentThread().setName(threadName);
        }

}

Reply via email to