aarti gupta created KAFKA-1657:
----------------------------------
Summary: Fetch request using Simple consumer fails due to failed
due to Leader not local for partition
Key: KAFKA-1657
URL: https://issues.apache.org/jira/browse/KAFKA-1657
Project: Kafka
Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: aarti gupta
I have a three node Kafka cluster, running on the same physical machine, (on
different ports)
with replication factor = 3, and a single topic with 3 partitions.
Multiple producers write to the topic, and a custom partitioner is used to
direct messages to a given partition.
I use the simple consumer to read from a given partition of the topic, and have
three instances of my consumer running
The code snippet for the simple consumer suggests, that having any node in the
cluster, (not necessarily the leader for that partition) is sufficient to find
the leader for the partition, however, on running this, I find, that given a
different node in the cluster, a null pointer exception is thrown, and the logs
show the error
[2014-09-28 20:40:20,984] WARN [KafkaApi-1] Fetch request with correlation id 0
from client testClient on partition [VCCTask,1] failed due to Leader not local
for partition [VCCTask,1] on broker 1 (kafka.server.KafkaApis)
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic VCCTask
Topic:VCCTask PartitionCount:3 ReplicationFactor:3 Configs:
Topic: VCCTask Partition: 0 Leader: 1 Replicas: 2,3,1 Isr:
1,2,3
Topic: VCCTask Partition: 1 Leader: 1 Replicas: 3,1,2 Isr:
1,2,3
Topic: VCCTask Partition: 2 Leader: 1 Replicas: 1,2,3 Isr:
1,2,3
If i specify the leader for the partition, instead of any node in the cluster,
everything works great, but this is an operational nightmare.
I was able to reproduce this using a simple test, where a producer writes
numbers from 1 to 999999, and the consumers, consume from a specific partition.
Here are the code snippets
public class TestConsumerStoreOffsetZookeeper {
public static void main(String[] args) throws JSONException {
TestConsumerStoreOffsetZookeeper testConsumer = new
TestConsumerStoreOffsetZookeeper();
JSONObject jsonObject = new JSONObject();
jsonObject.put("topicName", "VCCTask");
jsonObject.put("clientName", "testClient");
jsonObject.put("partition", args[0]);
jsonObject.put("hostPort", "172.16.78.171");
jsonObject.put("znodeName", "VCCTask");
jsonObject.put("port", args[1]);
testConsumer.initialize(jsonObject);
final long startTime = System.currentTimeMillis();
testConsumer.startReceiving(new FutureCallback<byte[]>() {
int noOfMessagesConsumed= 0;
@Override
public void onSuccess(byte[] result) {
LOG.info("YES!! " + ByteBuffer.wrap(result).getLong());
++noOfMessagesConsumed;
LOG.info("# Messages consumed "+ noOfMessagesConsumed +" Time
elapsed"+ (System.currentTimeMillis()-startTime )/1000 +" seconds");
}
@Override
public void onFailure(Throwable t) {
LOG.info("NO!! " + t.fillInStackTrace().getMessage());
}
});
}
private String topicToRead;
private static Logger LOG =
Logger.getLogger("TestConsumerStoreOffsetZookeeper");
List<String> seedBrokers = Lists.newArrayList("localhost");
private int port;
private SimpleConsumer consumer;
Integer partition;
String clientName;
private Broker currentLeader;
private String counter;
CuratorFramework zooKeeper;
public void startReceiving(final FutureCallback<byte[]> futureCallback) {
findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead,
partition);
LOG.info("Kafka consumer delegate listening on topic " + topicToRead +
" and partition " + partition);
int numErrors = 0;
while (true) {
long latestOffset = 0;
Stat stat = null;
final String path = "/" + topicToRead + "/"+partition;
try {
//************************Read top of the
stat = zooKeeper.checkExists().forPath(path);
if (stat == null) {
latestOffset =
getLastOffsetFromBeginningOfStream(this.consumer, topicToRead, partition,
OffsetRequest.EarliestTime(), clientName);
byte b[] = new byte[8];
ByteBuffer byteBuffer = ByteBuffer.wrap(b);
byteBuffer.putLong(latestOffset);
final String s =
zooKeeper.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
LOG.info(" Zookeeper create string is "+ s);
stat = zooKeeper.checkExists().forPath(path);
if (stat == null) {
LOG.info("Stat was null");
throw new RuntimeException("Stat in zookeeper was null,
cannot continue as message stream cannot be persisted");
}
} else {
final byte[] data =
zooKeeper.getData().storingStatIn(stat).forPath(path);
if(data.length>0){
latestOffset = ByteBuffer.wrap(data).getLong();
}else {
latestOffset =
getLastOffsetFromBeginningOfStream(this.consumer,topicToRead,partition,OffsetRequest.EarliestTime(),clientName);
}
}
} catch (Exception e) {
throw new RuntimeException(e.fillInStackTrace().getMessage());
}
LOG.info("Topic name is " + topicToRead);
LOG.info("Last offset is " + latestOffset);
LOG.info("Constructing new fetch request on " + topicToRead + "
from offset" + latestOffset);
FetchRequest request = new
FetchRequestBuilder().clientId(clientName).addFetch(topicToRead, partition,
latestOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(request);
if (fetchResponse.hasError()) {
numErrors++;
final short code = fetchResponse.errorCode(topicToRead,
partition);
LOG.info("Error fetching data from broker: " + consumer.host()
+ " Reason " + code);
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
LOG.info("Offset out of range error: calculating offset
again");
throw new RuntimeException("Offset is out of range,
multiple consumers are not allowed, this consumer will exit");
}
if (numErrors > 5 && code!=3) {
consumer.close();
consumer = null;
findLeaderAndUpdateSelfPointers(seedBrokers, port,
topicToRead, partition);
numErrors = 0;
}
continue;
}
final ByteBufferMessageSet messageAndOffsets =
fetchResponse.messageSet(topicToRead, partition);
final int validBytes = messageAndOffsets.validBytes();
LOG.info("Received fetch response on topic " + topicToRead + "
from offset" + latestOffset + " fetch response valid bytes is " + validBytes);
try {
if (validBytes == 0) {
LOG.info("No message received");
//Don't keep hammering Kafka
Thread.sleep(1000);
continue;
}
for (MessageAndOffset messageAndOffset : messageAndOffsets) {
LOG.info("Processing offset");
final long currentOffset = messageAndOffset.offset();
LOG.info("Processing offset " + currentOffset);
//in case of compression entire block may be received
if (currentOffset < latestOffset) {
LOG.info("Found an old offset: " + currentOffset +
"Expecting:" + latestOffset);
continue;
}
final ByteBuffer payload =
messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
LOG.info(this.getClass().getName() + " Received message
from offset" + String.valueOf(latestOffset) + new String(bytes, "UTF-8"));
LOG.info(this.getClass().getName() + " Executing future
callback");
//TODO ***************this should be atomic with writing
job in db***********************
futureCallback.onSuccess(bytes);
try {
long nextOffset = messageAndOffset.nextOffset();
incrementOffset(nextOffset, stat, path);
} catch (KeeperException | InterruptedException e) {
LOG.info("Encountered exception in writing to" +
e.fillInStackTrace().getMessage());
}
//****************************************************************************************
}
LOG.info("Outside for loop");
} catch (Exception e1) {
LOG.info("Error in processing message or running callback " +
e1.getMessage());
futureCallback.onFailure(e1);
throw new RuntimeException(e1);
}
}
}
private void incrementOffset(long nextOffset, Stat stat, String path)
throws Exception {
if (stat == null) {
throw new RuntimeException("Given stat was null");
}
byte b[] = new byte[8];
ByteBuffer byteBuffer = ByteBuffer.wrap(b);
byteBuffer.putLong(nextOffset);
LOG.info("Offset consumed successfully: Setting offset in zookeeper as
next offset: "+ nextOffset);
final Stat statWrite = zooKeeper.setData().forPath(path, b);
if(statWrite.getDataLength() ==0){
throw new RuntimeException("Unable to save offset in zookeeper");
}
}
//TODO: agupta adapters should not have an initialize method, rename and
merge with startListening
public void initialize(JSONObject configData) {
try {
final String hostPort = configData.getString("hostPort");
zooKeeper = CuratorFrameworkFactory.newClient(hostPort,new
ExponentialBackoffRetry(10, 3000));
zooKeeper.start();
this.counter = configData.getString("znodeName");
this.topicToRead = configData.getString("topicName");
LOG.info("Topic name is " + topicToRead);
//TODO: agupta: read seedbrokers from zookeeper
//*ZkClient zkClient = new ZkClient("localhost:2108", 4000, 6000,
new BytesPushThroughSerializer());
//List<String> brokerList = zkClient.getChildren("/brokers/ips");
List<String> seedBrokers = Lists.newArrayList("localhost");
this.seedBrokers = seedBrokers;
this.port = configData.getInt("port");
this.partition= configData.getInt("partition");
this.clientName = configData.getString("clientName");
LOG.info("Finding leader with for partition " + partition + "
clientName " + clientName);
} catch (JSONException | IOException e) {
e.printStackTrace();
LOG.info("Error parsing configuration" + e.getMessage());
} catch (Exception e) {
LOG.info("Error starting zookeeper" + e.getMessage());
}
}
/**
* Find last offset to define where to start reading if this is the first
read
*
* @param consumer
* @param topic
* @param partition
* @param whichTime
* @param clientName
* @return
*/
public static long getLastOffsetFromBeginningOfStream(SimpleConsumer
consumer, String topic, int partition,
long whichTime,
String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new
HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new
kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker.
Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
/**
* Return the lead broker for a given topic and partition
*
* @param seedBrokers
* @param port
* @param topic
* @param partition
* @return
*/
private PartitionMetadata findLeaderAndUpdateSelfPointers(List<String>
seedBrokers, int port, String topic, int partition) {
PartitionMetadata returnMetaData = null;
loop:
for (String seed : seedBrokers) {
SimpleConsumer consumer = null;
try {
this.consumer = new SimpleConsumer(seed, port, 100000, 64 *
1024, "leaderLookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == partition) {
returnMetaData = part;
LOG.info("Found leader " +
returnMetaData.leader().host());
break loop;
}
}
}
} catch (Exception e) {
LOG.info("Error communicating with Broker [" + seed + "] to
find Leader for [" + topic
+ ", " + partition + "] Reason: " + e);
} finally {
if (consumer != null) consumer.close();
}
}
LOG.info("KafkaConsumerDelegate initializing self pointers ");
if (returnMetaData != null) {
currentLeader = returnMetaData.leader();
if (currentLeader != null) {
this.consumer = new SimpleConsumer(currentLeader.host(),
currentLeader.port(), 100000, 64 * 1024, clientName);
}
}
LOG.info("KafkaConsumerDelegate: returning metadata");
return returnMetaData;
}
*******************************
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)