When I call consumer.commitOffsets(); before killing session, unit test
succeeded. This problem would happen only with autoCommit enabled

That seems expected. If you call commitOffsets() explicitly before
simulating a GC pause on the consumer, there will be no duplicates since
the next consumer instance that picks up the same partitions will start
reading from the last checkpointed offset. On the other hand, autoCommit
will only commit at a certain interval. So if you decide to pause the
consumer between 2 intervals, then it will replay data since the last
interval.

Thanks,
Neha


On Thu, Mar 27, 2014 at 4:21 PM, Bae, Jae Hyeon <metac...@gmail.com> wrote:

> When I call consumer.commitOffsets(); before killing session, unit test
> succeeded. This problem would happen only with autoCommit enabled.
>
> Could you fix this problem before releasing 0.8.1.1?
>
> Thank you
> Best, Jae
>
>
> On Thu, Mar 27, 2014 at 3:57 PM, Bae, Jae Hyeon <metac...@gmail.com>
> wrote:
>
> > Hi
> >
> > While testing kafka 0.8 consumer's zk resilience, I found that on the zk
> > session kill and handleNewSession() is called, high level consumer is
> > replaying messages.
> >
> > Is this know issue? I am attaching unit test source code.
> >
> > package com.netflix.nfkafka.zktest;
> >
> > import com.fasterxml.jackson.core.JsonProcessingException;
> > import com.google.common.collect.ImmutableMap;
> > import com.google.common.collect.Lists;
> > import com.netflix.logging.chukwa.JsonMapper;
> > import kafka.consumer.Consumer;
> > import kafka.consumer.ConsumerConfig;
> > import kafka.consumer.KafkaStream;
> > import kafka.consumer.ZookeeperConsumerConnector;
> > import kafka.javaapi.consumer.ConsumerConnector;
> > import kafka.javaapi.producer.Producer;
> > import kafka.producer.KeyedMessage;
> > import kafka.producer.ProducerConfig;
> > import kafka.server.KafkaConfig;
> > import kafka.server.KafkaServer;
> > import kafka.utils.ZkUtils;
> > import org.I0Itec.zkclient.IDefaultNameSpace;
> > import org.I0Itec.zkclient.ZkClient;
> > import org.I0Itec.zkclient.ZkConnection;
> > import org.I0Itec.zkclient.ZkServer;
> > import org.apache.commons.io.FileUtils;
> > import org.apache.commons.lang.StringUtils;
> > import org.apache.curator.test.KillSession;
> > import org.apache.curator.test.TestingServer;
> > import org.apache.zookeeper.ZKUtil;
> > import org.apache.zookeeper.ZooKeeper;
> > import org.codehaus.jackson.type.TypeReference;
> > import org.junit.AfterClass;
> > import org.junit.BeforeClass;
> > import org.junit.Test;
> >
> > import java.io.File;
> > import java.io.IOException;
> > import java.lang.reflect.Field;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.Properties;
> > import java.util.Random;
> >
> > import static org.junit.Assert.assertEquals;
> > import static org.junit.Assert.assertTrue;
> > import static org.junit.Assert.fail;
> >
> > public class TestZkSessionKill {
> >     private static TestingServer zkServer;
> >     private static KafkaServer server1;
> >     private static KafkaServer server2;
> >     private static KafkaConfig config1;
> >     private static KafkaConfig config2;
> >     private static ZooKeeper zk1;
> >     private static ZooKeeper zk2;
> >
> >     private static ConsumerConnector consumer;
> >     private static ZooKeeper zkConsumer;
> >     private static KafkaStream<byte[], byte[]> stream;
> >
> >     private static final int    BROKER_ID1 = 0;
> >     private static final int    BROKER_ID2 = 1;
> >     private static final int    KAFKA_PORT1 = 2200;
> >     private static final int    KAFKA_PORT2 = 2201;
> >
> >     public static String feed = "testzksessionkill";
> >
> >     @BeforeClass
> >     public static void setup() throws Exception {
> >         zkServer = new TestingServer(-1, tempDir());
> >         config1 = new KafkaConfig(createBrokerConfig(BROKER_ID1,
> > KAFKA_PORT1));
> >         server1 = createServer(config1);
> >         zk1 = getZk(server1.zkClient());
> >
> >         config2 = new KafkaConfig(createBrokerConfig(BROKER_ID2,
> > KAFKA_PORT2));
> >         server2 = createServer(config2);
> >         zk2 = getZk(server2.zkClient());
> >
> >         Properties props = new Properties();
> >         props.setProperty("zookeeper.connect",
> > zkServer.getConnectString());
> >         props.setProperty("group.id", feed);
> >         props.setProperty("auto.offset.reset", "smallest");
> >
> >         generateDataToKafka(0); // initially we have to create the topic
> >
> >         createConsumer(props);
> >     }
> >
> >     public static Properties createBrokerConfig(int nodeId, int port) {
> >         Properties props = new Properties();
> >         props.put("broker.id",
> > Integer.toString(nodeId));
> >         props.put("brokerId",
>  Integer.toString(nodeId));
> >         props.put("host.name",                   "localhost");
> >         props.put("port",                        Integer.toString(port));
> >         props.put("log.dir",
> > tempDir().getAbsolutePath());
> >         props.put("log.flush.interval.messages", "1");
> >         props.put("zookeeper.connect",
> > zkServer.getConnectString());
> >         props.put("replica.socket.timeout.ms",   "1500");
> >         props.put("hostName",                    "localhost");
> >         props.put("numPartitions",               "1");
> >
> >         return props;
> >     }
> >
> >     public static File tempDir() {
> >         File f = new File("./build/test", "kafka-" + new
> > Random().nextInt(1000000));
> >         f.mkdirs();
> >         System.out.println(f);
> >         f.deleteOnExit();
> >         return f;
> >     }
> >
> >     @AfterClass
> >     public static void shutdown() throws IOException {
> >         if (server1 != null) {
> >             server1.shutdown();
> >             server1.awaitShutdown();
> >         }
> >
> >         if (server2 != null) {
> >             server2.shutdown();
> >             server2.awaitShutdown();
> >         }
> >
> >         zkServer.close();
> >     }
> >
> >     public static KafkaServer createServer(KafkaConfig config) throws
> > NoSuchFieldException, IllegalAccessException {
> >         KafkaServer server = new KafkaServer(config,
> > kafka.utils.SystemTime$.MODULE$);
> >         server.startup();
> >         return server;
> >     }
> >
> >     public static ZooKeeper getZk(ZkClient zkClient) throws
> > NoSuchFieldException, IllegalAccessException {
> >         Field f = zkClient.getClass().getDeclaredField("_connection");
> >         f.setAccessible(true);
> >         ZkConnection zkConnection = (ZkConnection) f.get(zkClient);
> >         Field fk = zkConnection.getClass().getDeclaredField("_zk");
> >         fk.setAccessible(true);
> >         return (ZooKeeper) fk.get(zkConnection);
> >     }
> >
> >
> >     public static int messageCount = 25;
> >
> >     public static void generateDataToKafka(int startNumber) throws
> > IOException {
> >         Properties props = new Properties();
> >         props.setProperty("metadata.broker.list", getBrokerListStr());
> >         props.setProperty("producer.discovery", "static");
> >         props.setProperty("request.required.acks", "0");
> >         props.setProperty("producer.type", "sync");
> >
> >         Producer<byte[], byte[]> producer = new Producer<byte[],
> > byte[]>(new ProducerConfig(props));
> >         for (int i = 0; i < messageCount; ++i) {
> >             Map<String, Object> map = ImmutableMap.<String,
> > Object>builder()
> >                     .put("messageId", (i + startNumber))
> >                     .put("ts", System.currentTimeMillis())
> >                     .build();
> >
> >             try {
> >                 producer.send(new KeyedMessage<byte[], byte[]>(feed,
> > JsonMapper.getInstance().writeValueAsBytes(map)));
> >                 System.out.println("sent: " + map);
> >             } catch (JsonProcessingException e) {
> >                 fail();
> >             }
> >         }
> >         producer.close();
> >     }
> >
> >     public static String getBrokerListStr() {
> >         List<String> str = Lists.newArrayList();
> >         for (KafkaConfig config : Lists.newArrayList(config1, config2)) {
> >             str.add(config.hostName() + ":" + config.port());
> >         }
> >         return StringUtils.join(str, ",");
> >     }
> >
> >     @Test
> >     public void test() throws Exception {
> >         consumeDataFromkafka(0);
> >
> >         // kill sessions
> >         //KillSession.kill(zk1, zkServer.getConnectString());
> >         //KillSession.kill(zk2, zkServer.getConnectString());
> >         KillSession.kill(zkConsumer, zkServer.getConnectString());
> >
> >         // check broker registration and consumer registration
> >         ZkClient zkClient = new ZkClient(zkServer.getConnectString());
> >         boolean check1 = false;
> >         boolean check2 = false;
> >         boolean check3 = false;
> >         for (int i = 0; i < 3; ++i) {
> >             if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" +
> > BROKER_ID1)) {
> >                 Thread.sleep(1000);
> >                 continue;
> >             } else {
> >                 check1 = true;
> >             }
> >             if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" +
> > BROKER_ID2)) {
> >                 Thread.sleep(1000);
> >                 continue;
> >             } else {
> >                 check2 = true;
> >             }
> >             if (zkClient.getChildren(ZkUtils.ConsumersPath() + "/" + feed
> > + "/ids").isEmpty()) {
> >                 Thread.sleep(1000);
> >                 continue;
> >             } else {
> >                 check3 = true;
> >             }
> >         }
> >         assertTrue(check1 && check2 && check3);
> >
> >         generateDataToKafka(messageCount);
> >         consumeDataFromkafka(messageCount);
> >     }
> >
> >     public static void consumeDataFromkafka(int startNumber) throws
> > IOException {
> >         for (int i = 0; i < messageCount; ++i) {
> >             Map<String, Object> obj = JsonMapper.getInstance().readValue(
> >                     stream.iterator().next().message(),
> >                     new TypeReference<Map<String, Object>>() {}
> >             );
> >             assertEquals(obj.get("messageId"), i + startNumber);
> >             System.out.println("consumed:" + obj);
> >         }
> >     }
> >
> >     public static void createConsumer(Properties consumerProps) throws
> > NoSuchFieldException, IllegalAccessException {
> >         consumer = Consumer.createJavaConsumerConnector(new
> > ConsumerConfig(consumerProps));
> >
> >         final Map<String, List<KafkaStream<byte[], byte[]>>> streams =
> > consumer.createMessageStreams(ImmutableMap.of(feed, 1));
> >
> >         final List<KafkaStream<byte[], byte[]>> streamList =
> > streams.get(feed);
> >         if (streamList == null || streamList.size() != 1) {
> >             fail();
> >         }
> >
> >         stream = streamList.get(0);
> >
> >         Field f = consumer.getClass().getDeclaredField("underlying");
> >         f.setAccessible(true);
> >         ZookeeperConsumerConnector zkConsumerConnector =
> > (ZookeeperConsumerConnector) f.get(consumer);
> >
> >         Field fk =
> >
> zkConsumerConnector.getClass().getDeclaredField("kafka$consumer$ZookeeperConsumerConnector$$zkClient");
> >         fk.setAccessible(true);
> >         ZkClient zkClient = (ZkClient) fk.get(zkConsumerConnector);
> >         zkConsumer = getZk(zkClient);
> >     }
> > }
> >
> >
> > Thank you
> > Best, Jae
> >
>

Reply via email to