Hi

While testing kafka 0.8 consumer's zk resilience, I found that on the zk
session kill and handleNewSession() is called, high level consumer is
replaying messages.

Is this know issue? I am attaching unit test source code.

package com.netflix.nfkafka.zktest;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.netflix.logging.chukwa.JsonMapper;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.ZookeeperConsumerConnector;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.IDefaultNameSpace;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.ZkServer;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.codehaus.jackson.type.TypeReference;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class TestZkSessionKill {
    private static TestingServer zkServer;
    private static KafkaServer server1;
    private static KafkaServer server2;
    private static KafkaConfig config1;
    private static KafkaConfig config2;
    private static ZooKeeper zk1;
    private static ZooKeeper zk2;

    private static ConsumerConnector consumer;
    private static ZooKeeper zkConsumer;
    private static KafkaStream<byte[], byte[]> stream;

    private static final int    BROKER_ID1 = 0;
    private static final int    BROKER_ID2 = 1;
    private static final int    KAFKA_PORT1 = 2200;
    private static final int    KAFKA_PORT2 = 2201;

    public static String feed = "testzksessionkill";

    @BeforeClass
    public static void setup() throws Exception {
        zkServer = new TestingServer(-1, tempDir());
        config1 = new KafkaConfig(createBrokerConfig(BROKER_ID1,
KAFKA_PORT1));
        server1 = createServer(config1);
        zk1 = getZk(server1.zkClient());

        config2 = new KafkaConfig(createBrokerConfig(BROKER_ID2,
KAFKA_PORT2));
        server2 = createServer(config2);
        zk2 = getZk(server2.zkClient());

        Properties props = new Properties();
        props.setProperty("zookeeper.connect", zkServer.getConnectString());
        props.setProperty("group.id", feed);
        props.setProperty("auto.offset.reset", "smallest");

        generateDataToKafka(0); // initially we have to create the topic

        createConsumer(props);
    }

    public static Properties createBrokerConfig(int nodeId, int port) {
        Properties props = new Properties();
        props.put("broker.id",                   Integer.toString(nodeId));
        props.put("brokerId",                    Integer.toString(nodeId));
        props.put("host.name",                   "localhost");
        props.put("port",                        Integer.toString(port));
        props.put("log.dir",
tempDir().getAbsolutePath());
        props.put("log.flush.interval.messages", "1");
        props.put("zookeeper.connect",
zkServer.getConnectString());
        props.put("replica.socket.timeout.ms",   "1500");
        props.put("hostName",                    "localhost");
        props.put("numPartitions",               "1");

        return props;
    }

    public static File tempDir() {
        File f = new File("./build/test", "kafka-" + new
Random().nextInt(1000000));
        f.mkdirs();
        System.out.println(f);
        f.deleteOnExit();
        return f;
    }

    @AfterClass
    public static void shutdown() throws IOException {
        if (server1 != null) {
            server1.shutdown();
            server1.awaitShutdown();
        }

        if (server2 != null) {
            server2.shutdown();
            server2.awaitShutdown();
        }

        zkServer.close();
    }

    public static KafkaServer createServer(KafkaConfig config) throws
NoSuchFieldException, IllegalAccessException {
        KafkaServer server = new KafkaServer(config,
kafka.utils.SystemTime$.MODULE$);
        server.startup();
        return server;
    }

    public static ZooKeeper getZk(ZkClient zkClient) throws
NoSuchFieldException, IllegalAccessException {
        Field f = zkClient.getClass().getDeclaredField("_connection");
        f.setAccessible(true);
        ZkConnection zkConnection = (ZkConnection) f.get(zkClient);
        Field fk = zkConnection.getClass().getDeclaredField("_zk");
        fk.setAccessible(true);
        return (ZooKeeper) fk.get(zkConnection);
    }


    public static int messageCount = 25;

    public static void generateDataToKafka(int startNumber) throws
IOException {
        Properties props = new Properties();
        props.setProperty("metadata.broker.list", getBrokerListStr());
        props.setProperty("producer.discovery", "static");
        props.setProperty("request.required.acks", "0");
        props.setProperty("producer.type", "sync");

        Producer<byte[], byte[]> producer = new Producer<byte[],
byte[]>(new ProducerConfig(props));
        for (int i = 0; i < messageCount; ++i) {
            Map<String, Object> map = ImmutableMap.<String, Object>builder()
                    .put("messageId", (i + startNumber))
                    .put("ts", System.currentTimeMillis())
                    .build();

            try {
                producer.send(new KeyedMessage<byte[], byte[]>(feed,
JsonMapper.getInstance().writeValueAsBytes(map)));
                System.out.println("sent: " + map);
            } catch (JsonProcessingException e) {
                fail();
            }
        }
        producer.close();
    }

    public static String getBrokerListStr() {
        List<String> str = Lists.newArrayList();
        for (KafkaConfig config : Lists.newArrayList(config1, config2)) {
            str.add(config.hostName() + ":" + config.port());
        }
        return StringUtils.join(str, ",");
    }

    @Test
    public void test() throws Exception {
        consumeDataFromkafka(0);

        // kill sessions
        //KillSession.kill(zk1, zkServer.getConnectString());
        //KillSession.kill(zk2, zkServer.getConnectString());
        KillSession.kill(zkConsumer, zkServer.getConnectString());

        // check broker registration and consumer registration
        ZkClient zkClient = new ZkClient(zkServer.getConnectString());
        boolean check1 = false;
        boolean check2 = false;
        boolean check3 = false;
        for (int i = 0; i < 3; ++i) {
            if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" +
BROKER_ID1)) {
                Thread.sleep(1000);
                continue;
            } else {
                check1 = true;
            }
            if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" +
BROKER_ID2)) {
                Thread.sleep(1000);
                continue;
            } else {
                check2 = true;
            }
            if (zkClient.getChildren(ZkUtils.ConsumersPath() + "/" + feed +
"/ids").isEmpty()) {
                Thread.sleep(1000);
                continue;
            } else {
                check3 = true;
            }
        }
        assertTrue(check1 && check2 && check3);

        generateDataToKafka(messageCount);
        consumeDataFromkafka(messageCount);
    }

    public static void consumeDataFromkafka(int startNumber) throws
IOException {
        for (int i = 0; i < messageCount; ++i) {
            Map<String, Object> obj = JsonMapper.getInstance().readValue(
                    stream.iterator().next().message(),
                    new TypeReference<Map<String, Object>>() {}
            );
            assertEquals(obj.get("messageId"), i + startNumber);
            System.out.println("consumed:" + obj);
        }
    }

    public static void createConsumer(Properties consumerProps) throws
NoSuchFieldException, IllegalAccessException {
        consumer = Consumer.createJavaConsumerConnector(new
ConsumerConfig(consumerProps));

        final Map<String, List<KafkaStream<byte[], byte[]>>> streams =
consumer.createMessageStreams(ImmutableMap.of(feed, 1));

        final List<KafkaStream<byte[], byte[]>> streamList =
streams.get(feed);
        if (streamList == null || streamList.size() != 1) {
            fail();
        }

        stream = streamList.get(0);

        Field f = consumer.getClass().getDeclaredField("underlying");
        f.setAccessible(true);
        ZookeeperConsumerConnector zkConsumerConnector =
(ZookeeperConsumerConnector) f.get(consumer);

        Field fk =
zkConsumerConnector.getClass().getDeclaredField("kafka$consumer$ZookeeperConsumerConnector$$zkClient");
        fk.setAccessible(true);
        ZkClient zkClient = (ZkClient) fk.get(zkConsumerConnector);
        zkConsumer = getZk(zkClient);
    }
}


Thank you
Best, Jae

Reply via email to