Hi While testing kafka 0.8 consumer's zk resilience, I found that on the zk session kill and handleNewSession() is called, high level consumer is replaying messages.
Is this know issue? I am attaching unit test source code. package com.netflix.nfkafka.zktest; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.netflix.logging.chukwa.JsonMapper; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.consumer.ZookeeperConsumerConnector; import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.IDefaultNameSpace; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.ZkServer; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.curator.test.KillSession; import org.apache.curator.test.TestingServer; import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZooKeeper; import org.codehaus.jackson.type.TypeReference; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class TestZkSessionKill { private static TestingServer zkServer; private static KafkaServer server1; private static KafkaServer server2; private static KafkaConfig config1; private static KafkaConfig config2; private static ZooKeeper zk1; private static ZooKeeper zk2; private static ConsumerConnector consumer; private static ZooKeeper zkConsumer; private static KafkaStream<byte[], byte[]> stream; private static final int BROKER_ID1 = 0; private static final int BROKER_ID2 = 1; private static final int KAFKA_PORT1 = 2200; private static final int KAFKA_PORT2 = 2201; public static String feed = "testzksessionkill"; @BeforeClass public static void setup() throws Exception { zkServer = new TestingServer(-1, tempDir()); config1 = new KafkaConfig(createBrokerConfig(BROKER_ID1, KAFKA_PORT1)); server1 = createServer(config1); zk1 = getZk(server1.zkClient()); config2 = new KafkaConfig(createBrokerConfig(BROKER_ID2, KAFKA_PORT2)); server2 = createServer(config2); zk2 = getZk(server2.zkClient()); Properties props = new Properties(); props.setProperty("zookeeper.connect", zkServer.getConnectString()); props.setProperty("group.id", feed); props.setProperty("auto.offset.reset", "smallest"); generateDataToKafka(0); // initially we have to create the topic createConsumer(props); } public static Properties createBrokerConfig(int nodeId, int port) { Properties props = new Properties(); props.put("broker.id", Integer.toString(nodeId)); props.put("brokerId", Integer.toString(nodeId)); props.put("host.name", "localhost"); props.put("port", Integer.toString(port)); props.put("log.dir", tempDir().getAbsolutePath()); props.put("log.flush.interval.messages", "1"); props.put("zookeeper.connect", zkServer.getConnectString()); props.put("replica.socket.timeout.ms", "1500"); props.put("hostName", "localhost"); props.put("numPartitions", "1"); return props; } public static File tempDir() { File f = new File("./build/test", "kafka-" + new Random().nextInt(1000000)); f.mkdirs(); System.out.println(f); f.deleteOnExit(); return f; } @AfterClass public static void shutdown() throws IOException { if (server1 != null) { server1.shutdown(); server1.awaitShutdown(); } if (server2 != null) { server2.shutdown(); server2.awaitShutdown(); } zkServer.close(); } public static KafkaServer createServer(KafkaConfig config) throws NoSuchFieldException, IllegalAccessException { KafkaServer server = new KafkaServer(config, kafka.utils.SystemTime$.MODULE$); server.startup(); return server; } public static ZooKeeper getZk(ZkClient zkClient) throws NoSuchFieldException, IllegalAccessException { Field f = zkClient.getClass().getDeclaredField("_connection"); f.setAccessible(true); ZkConnection zkConnection = (ZkConnection) f.get(zkClient); Field fk = zkConnection.getClass().getDeclaredField("_zk"); fk.setAccessible(true); return (ZooKeeper) fk.get(zkConnection); } public static int messageCount = 25; public static void generateDataToKafka(int startNumber) throws IOException { Properties props = new Properties(); props.setProperty("metadata.broker.list", getBrokerListStr()); props.setProperty("producer.discovery", "static"); props.setProperty("request.required.acks", "0"); props.setProperty("producer.type", "sync"); Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(new ProducerConfig(props)); for (int i = 0; i < messageCount; ++i) { Map<String, Object> map = ImmutableMap.<String, Object>builder() .put("messageId", (i + startNumber)) .put("ts", System.currentTimeMillis()) .build(); try { producer.send(new KeyedMessage<byte[], byte[]>(feed, JsonMapper.getInstance().writeValueAsBytes(map))); System.out.println("sent: " + map); } catch (JsonProcessingException e) { fail(); } } producer.close(); } public static String getBrokerListStr() { List<String> str = Lists.newArrayList(); for (KafkaConfig config : Lists.newArrayList(config1, config2)) { str.add(config.hostName() + ":" + config.port()); } return StringUtils.join(str, ","); } @Test public void test() throws Exception { consumeDataFromkafka(0); // kill sessions //KillSession.kill(zk1, zkServer.getConnectString()); //KillSession.kill(zk2, zkServer.getConnectString()); KillSession.kill(zkConsumer, zkServer.getConnectString()); // check broker registration and consumer registration ZkClient zkClient = new ZkClient(zkServer.getConnectString()); boolean check1 = false; boolean check2 = false; boolean check3 = false; for (int i = 0; i < 3; ++i) { if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" + BROKER_ID1)) { Thread.sleep(1000); continue; } else { check1 = true; } if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" + BROKER_ID2)) { Thread.sleep(1000); continue; } else { check2 = true; } if (zkClient.getChildren(ZkUtils.ConsumersPath() + "/" + feed + "/ids").isEmpty()) { Thread.sleep(1000); continue; } else { check3 = true; } } assertTrue(check1 && check2 && check3); generateDataToKafka(messageCount); consumeDataFromkafka(messageCount); } public static void consumeDataFromkafka(int startNumber) throws IOException { for (int i = 0; i < messageCount; ++i) { Map<String, Object> obj = JsonMapper.getInstance().readValue( stream.iterator().next().message(), new TypeReference<Map<String, Object>>() {} ); assertEquals(obj.get("messageId"), i + startNumber); System.out.println("consumed:" + obj); } } public static void createConsumer(Properties consumerProps) throws NoSuchFieldException, IllegalAccessException { consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)); final Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(ImmutableMap.of(feed, 1)); final List<KafkaStream<byte[], byte[]>> streamList = streams.get(feed); if (streamList == null || streamList.size() != 1) { fail(); } stream = streamList.get(0); Field f = consumer.getClass().getDeclaredField("underlying"); f.setAccessible(true); ZookeeperConsumerConnector zkConsumerConnector = (ZookeeperConsumerConnector) f.get(consumer); Field fk = zkConsumerConnector.getClass().getDeclaredField("kafka$consumer$ZookeeperConsumerConnector$$zkClient"); fk.setAccessible(true); ZkClient zkClient = (ZkClient) fk.get(zkConsumerConnector); zkConsumer = getZk(zkClient); } } Thank you Best, Jae