Hi, We are evaluating kafka-0.8 for our product. We will start consumer for each partition. When i try to consume using High-Level API i could able to consume from kafka. But when i try to consume from kafka using Low-Level API, i am getting message size as 0. Am i missing some configuration?
PS: I have attached the test code used for testing SimpleConsumer Thanks in advance Ranjith Venkatesan
package kafka.examples; import java.util.logging.Level; import java.util.logging.Logger; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.common.ErrorMapping; import kafka.javaapi.FetchResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; //$Id$ public class SimpleKafkaConsumer implements Runnable { private static final Logger LOGGER = Logger.getLogger(SimpleKafkaConsumer.class.getName()); private static final int TIME_OUT = 100000; private static final int BUFFER_SIZE = 64 * 1024; private static final int FETCH_SIZE = 10000; private int partitionId; private int errorCount = 0; private Long readOffset; private Long processedOffset = 0l; private String leader; private String clientName; private String topic; private SimpleConsumer consumer = null; private SimpleKafkaConsumer(String topic, int partitionId, Long readOffset, String leader) throws Exception { this.topic = topic; this.partitionId = partitionId; this.readOffset = readOffset; this.leader = leader; //init clientName = KafkaConsumerUtil.formClientName(topic, partitionId); } /** * Fetches messages, writes to underlying Writer and schedules itself for * execution. */ @Override public void run() { int i=0; while(true) { try { //check leader status, if null get leader from kafka broker try { if (leader == null) { leader = KafkaConsumerUtil.findLeader(topic, partitionId, 5); } } catch (Exception e) { LOGGER.log(Level.SEVERE, "Cannot find leader for topic " + topic + " of partition " + partitionId + ", will retry at next schedule,", e); } //leader cannot be found, try at next execution. if (leader == null) { return; } //check simpleconsumer status, if null create new simpleconsumer instance if (consumer == null) { LOGGER.log(Level.INFO, "Leader for {0} is {1}.", new Object[] { clientName, leader }); consumer = new SimpleConsumer(leader.split(":")[0], Integer.parseInt(leader.split(":")[1]), TIME_OUT, BUFFER_SIZE, clientName); } //check read offset status, if null get earliest offset from kafka broker if (readOffset == null) { readOffset = KafkaConsumerUtil.getOffset(consumer, topic, partitionId, true, clientName); } //fetch message ByteBufferMessageSet fetchedMessages = handleFetch(consumer, topic, partitionId, readOffset); LOGGER.log(Level.INFO, "Reading from offset {0} for {1}.", new Object[] { readOffset, clientName }); //read fetched messages and write if (fetchedMessages != null) { LOGGER.log(Level.INFO, "Fetched message of size {0} bytes.", fetchedMessages.sizeInBytes()); for (MessageAndOffset messageAndOffset : fetchedMessages) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { LOGGER.log(Level.INFO, "Found old offset {0}, but expecting {1} for Client {2}", new Object[] { currentOffset, readOffset, clientName }); continue; } try { handleMessage(messageAndOffset.message()); } catch (Exception e) { LOGGER.log(Level.INFO, "Exception while handling message,", e); } processedOffset = readOffset; readOffset = messageAndOffset.nextOffset(); } } Thread.sleep(1000); } catch(Exception e) { e.printStackTrace(); close(); } finally { if(i!=0 && i%10 ==0) { close(); } } i++; } } //kafka consumer related /** * Fetch data from <b>Topic</b> leader for given <b>Partition</b> from given * <b>Offset</b>. */ private ByteBufferMessageSet handleFetch(SimpleConsumer consumer, String topic, int partitionId, long readOffset) { FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partitionId, readOffset, FETCH_SIZE).build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { handleFetchError(fetchResponse.errorCode(topic, partitionId)); return null; } else { return fetchResponse.messageSet(topic, partitionId); } } /** * Take action based on Error Code. */ private void handleFetchError(short errorCode) { LOGGER.log(Level.INFO, "Error fetching data from the Broker {0} for topic {1} partitionId {2} with error code {3}", new Object[] { leader, topic, partitionId, errorCode }); if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) { readOffset = KafkaConsumerUtil.getOffset(consumer, topic, partitionId, true, clientName); } else if (errorCode == ErrorMapping.BrokerNotAvailableCode() || errorCode == ErrorMapping.LeaderNotAvailableCode() || errorCode == ErrorMapping.NotLeaderForPartitionCode()) { errorCount++; if (errorCount > 5) { errorCount = 0; consumer.close(); consumer = null; leader = null; } } } /** * Closes underlying Writer, commits Offset and {@link SimpleConsumer} * instance. */ private void close() { try { commitOffset(processedOffset); if (consumer != null) { consumer.close(); } } catch(Exception e) { } } //offset persist /** * Persist the given <b>Offset</b> to DB. */ private void commitOffset(long offsetToCommit) throws Exception { //Update in DB } //message processing related /** * Obtain message key and payload from given {@link Message} object and * process it. */ private void handleMessage(Message message) throws Exception { byte[] keyBytes = KafkaConsumerUtil.byteBufferToByteArray(message.key()); byte[] msgBytes = KafkaConsumerUtil.byteBufferToByteArray(message.payload()); System.out.println(msgBytes); //processMessage(topic, new JSONObject(new String(keyBytes)), msgBytes); } /** * Parse message key, process data based on <b>Log Type</b> obtained from * message key, writes to underlying Writer and commits Offset when the * writer Rotates its files. */ public void processMessage(String topic, byte[] msg) throws Exception { } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append('{');//No I18N sb.append('"').append("Topic").append('"').append(':').append('"').append(topic).append('"').append(',');//No I18N sb.append('"').append("PartitionId").append('"').append(':').append('"').append(partitionId).append('"').append(',');//No I18N sb.append('"').append("Client Name").append('"').append(':').append(clientName);//No I18N sb.append('}');//No I18N return sb.toString(); } public static void main(String[] args) { try { SimpleKafkaConsumer consumer = new SimpleKafkaConsumer("TOPIC1", 1, null, null); Thread t = new Thread(consumer); t.start(); } catch(Exception e) { e.printStackTrace(); } } }
package kafka.examples; //$Id: $ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; public class KafkaConsumerUtil { private static final Logger LOGGER = Logger.getLogger(KafkaConsumerUtil.class.getName()); /** * Gets {@link PartitionMetadata} for given <b>Topic</b> and * <b>PartitionId</b> list from Kafka Broker using * {@link TopicMetadataRequest}. * * @param topicPartitionDetail * A {@link Map} with key as <b>Topic</b> and value as * {@link Integer} {@link List} of <b>PartitionId</b>. */ public static Map<String, ArrayList<PartitionMetadata>> getTopicPartitionMeta(Map<String, ArrayList<Integer>> topicPartitionDetail) throws Exception { List<String> brokerList = new ArrayList<String>(); brokerList.add("search360-ubuntu12:9092"); brokerList.add("search360-ubuntu12:9091"); brokerList.add("search360-ubuntu12:9090"); List<String> topicList = new ArrayList<String>(topicPartitionDetail.keySet()); Map<String, ArrayList<PartitionMetadata>> returnTopicPartitionMeta = new HashMap<String, ArrayList<PartitionMetadata>>(); for (String brokerIP : brokerList) { SimpleConsumer consumer = null; String ip = brokerIP.split(":")[0]; int port = Integer.parseInt(brokerIP.split(":")[1]); try { consumer = new SimpleConsumer(ip, port, 100000, 64 * 1024, "leaderLookup");//No I18N TopicMetadataRequest req = new TopicMetadataRequest(topicList); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata topicMeta : metaData) { String topic = topicMeta.topic(); if (!topicPartitionDetail.containsKey(topic)) { continue; } List<Integer> partitionList = topicPartitionDetail.get(topic); int count = partitionList.size(); for (PartitionMetadata part : topicMeta.partitionsMetadata()) { if (count == 0) { break; } Integer partId = part.partitionId(); if (partitionList.contains(partId)) { if (!returnTopicPartitionMeta.containsKey(topic)) { returnTopicPartitionMeta.put(topic, new ArrayList<PartitionMetadata>()); } returnTopicPartitionMeta.get(topic).add(part); count--; } } } } catch (Exception e) { LOGGER.log(Level.INFO, "Error communicating with Broker " + brokerIP + " to find Leader for " + topicPartitionDetail, e); } finally { if (consumer != null) { consumer.close(); } } } return returnTopicPartitionMeta; } /** * Gets {@link PartitionMetadata} for given <b>Topic</b> and * <b>PartitionId</b> from Kafka Broker using {@link TopicMetadataRequest}. */ public static PartitionMetadata getTopicPartitionMeta(String topic, int partitionId) throws Exception { Map<String, ArrayList<Integer>> map = new HashMap<String, ArrayList<Integer>>(1); ArrayList<Integer> list = new ArrayList<Integer>(1); list.add(partitionId); map.put(topic, list); Map<String, ArrayList<PartitionMetadata>> retMap = getTopicPartitionMeta(map); if (retMap.containsKey(topic)) { return retMap.get(topic).get(0); } else { return null; } } /** * Find <b>Leader</b> for given <b>Topic</b> and <b>PartitionId</b>. */ public static String findLeader(String topic, int partitionId, int maxRetry) throws Exception { return findNewLeader("", topic, partitionId, maxRetry); } /** * Find new <b>Leader</b> for given <b>Topic</b> and <b>PartitionId</b>. */ public static String findNewLeader(String oldLeader, String topic, int partitionId, int maxRetry) throws Exception { int i = 0; long sleep = 0l; boolean goToSleep; while (maxRetry < 0 || i < maxRetry) { goToSleep = false; PartitionMetadata metadata = getTopicPartitionMeta(topic, partitionId); if (metadata == null || metadata.leader() == null) { goToSleep = true; } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover // second time, assume the broker did recover before failover, or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host()+":"+metadata.leader().port(); } if (goToSleep) { sleep = 1000; LOGGER.log(Level.INFO, "Cannot find leader for topic {0} partitionId {1} in retry {2}, retrying after {3} millis.", new Object[] { topic, partitionId, i, sleep }); try { Thread.sleep(sleep); } catch (InterruptedException ie) { } } } if (oldLeader.equalsIgnoreCase("")) { throw new Exception("Unable to find New leader after Broker failure."); } else { throw new Exception("Unable to find leader."); } } /** * Get <b>Offset</b> from given <b>Topic</b> and <b>PartitionId</b> using * {@link kafka.javaapi.OffsetRequest}. * * @param earliestTime * <b>True</b> to get Earliest Offset, <b>False</b> to get Latest * Offset. */ public static long getOffset(SimpleConsumer consumer, String topic, int partition, boolean earliestTime, String clientName) { long whichTime; if (earliestTime) { whichTime = kafka.api.OffsetRequest.EarliestTime(); } else { whichTime = kafka.api.OffsetRequest.LatestTime(); } TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { LOGGER.log(Level.INFO, "Error fetching Offset Data from Broker with Error Code {0}.", response.errorCode(topic, partition)); return 0l; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } /** * Convert {@link Byte}[] from {@link ByteBuffer}. */ public static byte[] byteBufferToByteArray(ByteBuffer bb) { byte[] returnBytes = new byte[bb.limit()]; bb.get(returnBytes); return returnBytes; } /** * Schedules itself for execution after given time. */ public static void scheduleSleep(ScheduledThreadPoolExecutor executor, Runnable runnable, long delay, TimeUnit unit) { LOGGER.log(Level.INFO, "Schedule sleep for {0} {1} for thread {2}.", new Object[] { delay, unit, getThreadName() }); executor.schedule(runnable, delay, unit); } /** * Form Client Name used for communication with Kafka Brokers. */ public static String formClientName(String topic, int partitionId) { String myIp; try { myIp = InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { LOGGER.log(Level.INFO, "Exception while getting my ip, so using \'Client\' instead of ip", e); myIp = "Client";//No I18N } return myIp + "_" + topic + "_" + partitionId; } /** * Returns current Threads Name. */ public static String getThreadName() { return Thread.currentThread().getName(); } /** * Sets given threadName as current Threads Name. */ public static void setThreadName(String threadName) { Thread.currentThread().setName(threadName); } }