Hi,
We are evaluating kafka-0.8 for our product. We will start consumer for
each partition. When i try to consume using High-Level API i could able to
consume from kafka. But when i try to consume from kafka using Low-Level API, i
am getting message size as 0. Am i missing some configuration?
PS: I have attached the test code used for testing SimpleConsumer
Thanks in advance
Ranjith Venkatesan
package kafka.examples;
import java.util.logging.Level;
import java.util.logging.Logger;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
//$Id$
public class SimpleKafkaConsumer implements Runnable
{
private static final Logger LOGGER =
Logger.getLogger(SimpleKafkaConsumer.class.getName());
private static final int TIME_OUT = 100000;
private static final int BUFFER_SIZE = 64 * 1024;
private static final int FETCH_SIZE = 10000;
private int partitionId;
private int errorCount = 0;
private Long readOffset;
private Long processedOffset = 0l;
private String leader;
private String clientName;
private String topic;
private SimpleConsumer consumer = null;
private SimpleKafkaConsumer(String topic, int partitionId, Long
readOffset, String leader) throws Exception
{
this.topic = topic;
this.partitionId = partitionId;
this.readOffset = readOffset;
this.leader = leader;
//init
clientName = KafkaConsumerUtil.formClientName(topic,
partitionId);
}
/**
* Fetches messages, writes to underlying Writer and schedules itself
for
* execution.
*/
@Override
public void run()
{
int i=0;
while(true)
{
try
{
//check leader status, if null get leader from
kafka broker
try
{
if (leader == null)
{
leader =
KafkaConsumerUtil.findLeader(topic, partitionId, 5);
}
}
catch (Exception e)
{
LOGGER.log(Level.SEVERE, "Cannot find
leader for topic " + topic + " of partition " + partitionId + ", will retry at
next schedule,", e);
}
//leader cannot be found, try at next execution.
if (leader == null)
{
return;
}
//check simpleconsumer status, if null create
new simpleconsumer instance
if (consumer == null)
{
LOGGER.log(Level.INFO, "Leader for {0}
is {1}.", new Object[] { clientName, leader });
consumer = new
SimpleConsumer(leader.split(":")[0], Integer.parseInt(leader.split(":")[1]),
TIME_OUT, BUFFER_SIZE, clientName);
}
//check read offset status, if null get
earliest offset from kafka broker
if (readOffset == null)
{
readOffset =
KafkaConsumerUtil.getOffset(consumer, topic, partitionId, true, clientName);
}
//fetch message
ByteBufferMessageSet fetchedMessages =
handleFetch(consumer, topic, partitionId, readOffset);
LOGGER.log(Level.INFO, "Reading from offset {0}
for {1}.", new Object[] { readOffset, clientName });
//read fetched messages and write
if (fetchedMessages != null)
{
LOGGER.log(Level.INFO, "Fetched message
of size {0} bytes.", fetchedMessages.sizeInBytes());
for (MessageAndOffset messageAndOffset
: fetchedMessages)
{
long currentOffset =
messageAndOffset.offset();
if (currentOffset < readOffset)
{
LOGGER.log(Level.INFO,
"Found old offset {0}, but expecting {1} for Client {2}", new Object[] {
currentOffset, readOffset, clientName });
continue;
}
try
{
handleMessage(messageAndOffset.message());
}
catch (Exception e)
{
LOGGER.log(Level.INFO,
"Exception while handling message,", e);
}
processedOffset = readOffset;
readOffset =
messageAndOffset.nextOffset();
}
}
Thread.sleep(1000);
}
catch(Exception e)
{
e.printStackTrace();
close();
}
finally
{
if(i!=0 && i%10 ==0)
{
close();
}
}
i++;
}
}
//kafka consumer related
/**
* Fetch data from <b>Topic</b> leader for given <b>Partition</b> from
given
* <b>Offset</b>.
*/
private ByteBufferMessageSet handleFetch(SimpleConsumer consumer,
String topic, int partitionId, long readOffset)
{
FetchRequest req = new
FetchRequestBuilder().clientId(clientName).addFetch(topic, partitionId,
readOffset, FETCH_SIZE).build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError())
{
handleFetchError(fetchResponse.errorCode(topic,
partitionId));
return null;
}
else
{
return fetchResponse.messageSet(topic, partitionId);
}
}
/**
* Take action based on Error Code.
*/
private void handleFetchError(short errorCode)
{
LOGGER.log(Level.INFO, "Error fetching data from the Broker {0}
for topic {1} partitionId {2} with error code {3}", new Object[] { leader,
topic, partitionId, errorCode });
if (errorCode == ErrorMapping.OffsetOutOfRangeCode())
{
readOffset = KafkaConsumerUtil.getOffset(consumer,
topic, partitionId, true, clientName);
}
else if (errorCode == ErrorMapping.BrokerNotAvailableCode() ||
errorCode == ErrorMapping.LeaderNotAvailableCode() || errorCode ==
ErrorMapping.NotLeaderForPartitionCode())
{
errorCount++;
if (errorCount > 5)
{
errorCount = 0;
consumer.close();
consumer = null;
leader = null;
}
}
}
/**
* Closes underlying Writer, commits Offset and {@link SimpleConsumer}
* instance.
*/
private void close()
{
try
{
commitOffset(processedOffset);
if (consumer != null)
{
consumer.close();
}
}
catch(Exception e)
{
}
}
//offset persist
/**
* Persist the given <b>Offset</b> to DB.
*/
private void commitOffset(long offsetToCommit) throws Exception
{
//Update in DB
}
//message processing related
/**
* Obtain message key and payload from given {@link Message} object and
* process it.
*/
private void handleMessage(Message message) throws Exception
{
byte[] keyBytes =
KafkaConsumerUtil.byteBufferToByteArray(message.key());
byte[] msgBytes =
KafkaConsumerUtil.byteBufferToByteArray(message.payload());
System.out.println(msgBytes);
//processMessage(topic, new JSONObject(new String(keyBytes)),
msgBytes);
}
/**
* Parse message key, process data based on <b>Log Type</b> obtained
from
* message key, writes to underlying Writer and commits Offset when the
* writer Rotates its files.
*/
public void processMessage(String topic, byte[] msg) throws Exception
{
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append('{');//No I18N
sb.append('"').append("Topic").append('"').append(':').append('"').append(topic).append('"').append(',');//No
I18N
sb.append('"').append("PartitionId").append('"').append(':').append('"').append(partitionId).append('"').append(',');//No
I18N
sb.append('"').append("Client
Name").append('"').append(':').append(clientName);//No I18N
sb.append('}');//No I18N
return sb.toString();
}
public static void main(String[] args)
{
try
{
SimpleKafkaConsumer consumer = new
SimpleKafkaConsumer("TOPIC1", 1, null, null);
Thread t = new Thread(consumer);
t.start();
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
package kafka.examples;
//$Id: $
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
public class KafkaConsumerUtil
{
private static final Logger LOGGER =
Logger.getLogger(KafkaConsumerUtil.class.getName());
/**
* Gets {@link PartitionMetadata} for given <b>Topic</b> and
* <b>PartitionId</b> list from Kafka Broker using
* {@link TopicMetadataRequest}.
*
* @param topicPartitionDetail
* A {@link Map} with key as <b>Topic</b> and value as
* {@link Integer} {@link List} of <b>PartitionId</b>.
*/
public static Map<String, ArrayList<PartitionMetadata>>
getTopicPartitionMeta(Map<String, ArrayList<Integer>> topicPartitionDetail)
throws Exception
{
List<String> brokerList = new ArrayList<String>();
brokerList.add("search360-ubuntu12:9092");
brokerList.add("search360-ubuntu12:9091");
brokerList.add("search360-ubuntu12:9090");
List<String> topicList = new
ArrayList<String>(topicPartitionDetail.keySet());
Map<String, ArrayList<PartitionMetadata>>
returnTopicPartitionMeta = new HashMap<String, ArrayList<PartitionMetadata>>();
for (String brokerIP : brokerList)
{
SimpleConsumer consumer = null;
String ip = brokerIP.split(":")[0];
int port = Integer.parseInt(brokerIP.split(":")[1]);
try
{
consumer = new SimpleConsumer(ip, port, 100000,
64 * 1024, "leaderLookup");//No I18N
TopicMetadataRequest req = new
TopicMetadataRequest(topicList);
kafka.javaapi.TopicMetadataResponse resp =
consumer.send(req);
List<TopicMetadata> metaData =
resp.topicsMetadata();
for (TopicMetadata topicMeta : metaData)
{
String topic = topicMeta.topic();
if
(!topicPartitionDetail.containsKey(topic))
{
continue;
}
List<Integer> partitionList =
topicPartitionDetail.get(topic);
int count = partitionList.size();
for (PartitionMetadata part :
topicMeta.partitionsMetadata())
{
if (count == 0)
{
break;
}
Integer partId =
part.partitionId();
if
(partitionList.contains(partId))
{
if
(!returnTopicPartitionMeta.containsKey(topic))
{
returnTopicPartitionMeta.put(topic, new ArrayList<PartitionMetadata>());
}
returnTopicPartitionMeta.get(topic).add(part);
count--;
}
}
}
}
catch (Exception e)
{
LOGGER.log(Level.INFO, "Error communicating
with Broker " + brokerIP + " to find Leader for " + topicPartitionDetail, e);
}
finally
{
if (consumer != null)
{
consumer.close();
}
}
}
return returnTopicPartitionMeta;
}
/**
* Gets {@link PartitionMetadata} for given <b>Topic</b> and
* <b>PartitionId</b> from Kafka Broker using {@link
TopicMetadataRequest}.
*/
public static PartitionMetadata getTopicPartitionMeta(String topic, int
partitionId) throws Exception
{
Map<String, ArrayList<Integer>> map = new HashMap<String,
ArrayList<Integer>>(1);
ArrayList<Integer> list = new ArrayList<Integer>(1);
list.add(partitionId);
map.put(topic, list);
Map<String, ArrayList<PartitionMetadata>> retMap =
getTopicPartitionMeta(map);
if (retMap.containsKey(topic))
{
return retMap.get(topic).get(0);
}
else
{
return null;
}
}
/**
* Find <b>Leader</b> for given <b>Topic</b> and <b>PartitionId</b>.
*/
public static String findLeader(String topic, int partitionId, int
maxRetry) throws Exception
{
return findNewLeader("", topic, partitionId, maxRetry);
}
/**
* Find new <b>Leader</b> for given <b>Topic</b> and <b>PartitionId</b>.
*/
public static String findNewLeader(String oldLeader, String topic, int
partitionId, int maxRetry) throws Exception
{
int i = 0;
long sleep = 0l;
boolean goToSleep;
while (maxRetry < 0 || i < maxRetry)
{
goToSleep = false;
PartitionMetadata metadata =
getTopicPartitionMeta(topic, partitionId);
if (metadata == null || metadata.leader() == null)
{
goToSleep = true;
}
else if
(oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0)
{
// first time through if the leader hasn't
changed give ZooKeeper a second to recover
// second time, assume the broker did recover
before failover, or it was a non-Broker issue
//
goToSleep = true;
}
else
{
return
metadata.leader().host()+":"+metadata.leader().port();
}
if (goToSleep)
{
sleep = 1000;
LOGGER.log(Level.INFO, "Cannot find leader for
topic {0} partitionId {1} in retry {2}, retrying after {3} millis.", new
Object[] { topic, partitionId, i, sleep });
try
{
Thread.sleep(sleep);
}
catch (InterruptedException ie)
{
}
}
}
if (oldLeader.equalsIgnoreCase(""))
{
throw new Exception("Unable to find New leader after
Broker failure.");
}
else
{
throw new Exception("Unable to find leader.");
}
}
/**
* Get <b>Offset</b> from given <b>Topic</b> and <b>PartitionId</b>
using
* {@link kafka.javaapi.OffsetRequest}.
*
* @param earliestTime
* <b>True</b> to get Earliest Offset, <b>False</b> to get
Latest
* Offset.
*/
public static long getOffset(SimpleConsumer consumer, String topic, int
partition, boolean earliestTime, String clientName)
{
long whichTime;
if (earliestTime)
{
whichTime = kafka.api.OffsetRequest.EarliestTime();
}
else
{
whichTime = kafka.api.OffsetRequest.LatestTime();
}
TopicAndPartition topicAndPartition = new
TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo
= new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new
kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError())
{
LOGGER.log(Level.INFO, "Error fetching Offset Data from
Broker with Error Code {0}.", response.errorCode(topic, partition));
return 0l;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
/**
* Convert {@link Byte}[] from {@link ByteBuffer}.
*/
public static byte[] byteBufferToByteArray(ByteBuffer bb)
{
byte[] returnBytes = new byte[bb.limit()];
bb.get(returnBytes);
return returnBytes;
}
/**
* Schedules itself for execution after given time.
*/
public static void scheduleSleep(ScheduledThreadPoolExecutor executor,
Runnable runnable, long delay, TimeUnit unit)
{
LOGGER.log(Level.INFO, "Schedule sleep for {0} {1} for thread
{2}.", new Object[] { delay, unit, getThreadName() });
executor.schedule(runnable, delay, unit);
}
/**
* Form Client Name used for communication with Kafka Brokers.
*/
public static String formClientName(String topic, int partitionId)
{
String myIp;
try
{
myIp = InetAddress.getLocalHost().getHostAddress();
}
catch (Exception e)
{
LOGGER.log(Level.INFO, "Exception while getting my ip,
so using \'Client\' instead of ip", e);
myIp = "Client";//No I18N
}
return myIp + "_" + topic + "_" + partitionId;
}
/**
* Returns current Threads Name.
*/
public static String getThreadName()
{
return Thread.currentThread().getName();
}
/**
* Sets given threadName as current Threads Name.
*/
public static void setThreadName(String threadName)
{
Thread.currentThread().setName(threadName);
}
}