That makes sense. Thank you.

On Thu, Mar 27, 2014 at 8:32 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> When I call consumer.commitOffsets(); before killing session, unit test
> succeeded. This problem would happen only with autoCommit enabled
>
> That seems expected. If you call commitOffsets() explicitly before
> simulating a GC pause on the consumer, there will be no duplicates since
> the next consumer instance that picks up the same partitions will start
> reading from the last checkpointed offset. On the other hand, autoCommit
> will only commit at a certain interval. So if you decide to pause the
> consumer between 2 intervals, then it will replay data since the last
> interval.
>
> Thanks,
> Neha
>
>
> On Thu, Mar 27, 2014 at 4:21 PM, Bae, Jae Hyeon <metac...@gmail.com>
> wrote:
>
> > When I call consumer.commitOffsets(); before killing session, unit test
> > succeeded. This problem would happen only with autoCommit enabled.
> >
> > Could you fix this problem before releasing 0.8.1.1?
> >
> > Thank you
> > Best, Jae
> >
> >
> > On Thu, Mar 27, 2014 at 3:57 PM, Bae, Jae Hyeon <metac...@gmail.com>
> > wrote:
> >
> > > Hi
> > >
> > > While testing kafka 0.8 consumer's zk resilience, I found that on the
> zk
> > > session kill and handleNewSession() is called, high level consumer is
> > > replaying messages.
> > >
> > > Is this know issue? I am attaching unit test source code.
> > >
> > > package com.netflix.nfkafka.zktest;
> > >
> > > import com.fasterxml.jackson.core.JsonProcessingException;
> > > import com.google.common.collect.ImmutableMap;
> > > import com.google.common.collect.Lists;
> > > import com.netflix.logging.chukwa.JsonMapper;
> > > import kafka.consumer.Consumer;
> > > import kafka.consumer.ConsumerConfig;
> > > import kafka.consumer.KafkaStream;
> > > import kafka.consumer.ZookeeperConsumerConnector;
> > > import kafka.javaapi.consumer.ConsumerConnector;
> > > import kafka.javaapi.producer.Producer;
> > > import kafka.producer.KeyedMessage;
> > > import kafka.producer.ProducerConfig;
> > > import kafka.server.KafkaConfig;
> > > import kafka.server.KafkaServer;
> > > import kafka.utils.ZkUtils;
> > > import org.I0Itec.zkclient.IDefaultNameSpace;
> > > import org.I0Itec.zkclient.ZkClient;
> > > import org.I0Itec.zkclient.ZkConnection;
> > > import org.I0Itec.zkclient.ZkServer;
> > > import org.apache.commons.io.FileUtils;
> > > import org.apache.commons.lang.StringUtils;
> > > import org.apache.curator.test.KillSession;
> > > import org.apache.curator.test.TestingServer;
> > > import org.apache.zookeeper.ZKUtil;
> > > import org.apache.zookeeper.ZooKeeper;
> > > import org.codehaus.jackson.type.TypeReference;
> > > import org.junit.AfterClass;
> > > import org.junit.BeforeClass;
> > > import org.junit.Test;
> > >
> > > import java.io.File;
> > > import java.io.IOException;
> > > import java.lang.reflect.Field;
> > > import java.util.List;
> > > import java.util.Map;
> > > import java.util.Properties;
> > > import java.util.Random;
> > >
> > > import static org.junit.Assert.assertEquals;
> > > import static org.junit.Assert.assertTrue;
> > > import static org.junit.Assert.fail;
> > >
> > > public class TestZkSessionKill {
> > >     private static TestingServer zkServer;
> > >     private static KafkaServer server1;
> > >     private static KafkaServer server2;
> > >     private static KafkaConfig config1;
> > >     private static KafkaConfig config2;
> > >     private static ZooKeeper zk1;
> > >     private static ZooKeeper zk2;
> > >
> > >     private static ConsumerConnector consumer;
> > >     private static ZooKeeper zkConsumer;
> > >     private static KafkaStream<byte[], byte[]> stream;
> > >
> > >     private static final int    BROKER_ID1 = 0;
> > >     private static final int    BROKER_ID2 = 1;
> > >     private static final int    KAFKA_PORT1 = 2200;
> > >     private static final int    KAFKA_PORT2 = 2201;
> > >
> > >     public static String feed = "testzksessionkill";
> > >
> > >     @BeforeClass
> > >     public static void setup() throws Exception {
> > >         zkServer = new TestingServer(-1, tempDir());
> > >         config1 = new KafkaConfig(createBrokerConfig(BROKER_ID1,
> > > KAFKA_PORT1));
> > >         server1 = createServer(config1);
> > >         zk1 = getZk(server1.zkClient());
> > >
> > >         config2 = new KafkaConfig(createBrokerConfig(BROKER_ID2,
> > > KAFKA_PORT2));
> > >         server2 = createServer(config2);
> > >         zk2 = getZk(server2.zkClient());
> > >
> > >         Properties props = new Properties();
> > >         props.setProperty("zookeeper.connect",
> > > zkServer.getConnectString());
> > >         props.setProperty("group.id", feed);
> > >         props.setProperty("auto.offset.reset", "smallest");
> > >
> > >         generateDataToKafka(0); // initially we have to create the
> topic
> > >
> > >         createConsumer(props);
> > >     }
> > >
> > >     public static Properties createBrokerConfig(int nodeId, int port) {
> > >         Properties props = new Properties();
> > >         props.put("broker.id",
> > > Integer.toString(nodeId));
> > >         props.put("brokerId",
> >  Integer.toString(nodeId));
> > >         props.put("host.name",                   "localhost");
> > >         props.put("port",
>  Integer.toString(port));
> > >         props.put("log.dir",
> > > tempDir().getAbsolutePath());
> > >         props.put("log.flush.interval.messages", "1");
> > >         props.put("zookeeper.connect",
> > > zkServer.getConnectString());
> > >         props.put("replica.socket.timeout.ms",   "1500");
> > >         props.put("hostName",                    "localhost");
> > >         props.put("numPartitions",               "1");
> > >
> > >         return props;
> > >     }
> > >
> > >     public static File tempDir() {
> > >         File f = new File("./build/test", "kafka-" + new
> > > Random().nextInt(1000000));
> > >         f.mkdirs();
> > >         System.out.println(f);
> > >         f.deleteOnExit();
> > >         return f;
> > >     }
> > >
> > >     @AfterClass
> > >     public static void shutdown() throws IOException {
> > >         if (server1 != null) {
> > >             server1.shutdown();
> > >             server1.awaitShutdown();
> > >         }
> > >
> > >         if (server2 != null) {
> > >             server2.shutdown();
> > >             server2.awaitShutdown();
> > >         }
> > >
> > >         zkServer.close();
> > >     }
> > >
> > >     public static KafkaServer createServer(KafkaConfig config) throws
> > > NoSuchFieldException, IllegalAccessException {
> > >         KafkaServer server = new KafkaServer(config,
> > > kafka.utils.SystemTime$.MODULE$);
> > >         server.startup();
> > >         return server;
> > >     }
> > >
> > >     public static ZooKeeper getZk(ZkClient zkClient) throws
> > > NoSuchFieldException, IllegalAccessException {
> > >         Field f = zkClient.getClass().getDeclaredField("_connection");
> > >         f.setAccessible(true);
> > >         ZkConnection zkConnection = (ZkConnection) f.get(zkClient);
> > >         Field fk = zkConnection.getClass().getDeclaredField("_zk");
> > >         fk.setAccessible(true);
> > >         return (ZooKeeper) fk.get(zkConnection);
> > >     }
> > >
> > >
> > >     public static int messageCount = 25;
> > >
> > >     public static void generateDataToKafka(int startNumber) throws
> > > IOException {
> > >         Properties props = new Properties();
> > >         props.setProperty("metadata.broker.list", getBrokerListStr());
> > >         props.setProperty("producer.discovery", "static");
> > >         props.setProperty("request.required.acks", "0");
> > >         props.setProperty("producer.type", "sync");
> > >
> > >         Producer<byte[], byte[]> producer = new Producer<byte[],
> > > byte[]>(new ProducerConfig(props));
> > >         for (int i = 0; i < messageCount; ++i) {
> > >             Map<String, Object> map = ImmutableMap.<String,
> > > Object>builder()
> > >                     .put("messageId", (i + startNumber))
> > >                     .put("ts", System.currentTimeMillis())
> > >                     .build();
> > >
> > >             try {
> > >                 producer.send(new KeyedMessage<byte[], byte[]>(feed,
> > > JsonMapper.getInstance().writeValueAsBytes(map)));
> > >                 System.out.println("sent: " + map);
> > >             } catch (JsonProcessingException e) {
> > >                 fail();
> > >             }
> > >         }
> > >         producer.close();
> > >     }
> > >
> > >     public static String getBrokerListStr() {
> > >         List<String> str = Lists.newArrayList();
> > >         for (KafkaConfig config : Lists.newArrayList(config1,
> config2)) {
> > >             str.add(config.hostName() + ":" + config.port());
> > >         }
> > >         return StringUtils.join(str, ",");
> > >     }
> > >
> > >     @Test
> > >     public void test() throws Exception {
> > >         consumeDataFromkafka(0);
> > >
> > >         // kill sessions
> > >         //KillSession.kill(zk1, zkServer.getConnectString());
> > >         //KillSession.kill(zk2, zkServer.getConnectString());
> > >         KillSession.kill(zkConsumer, zkServer.getConnectString());
> > >
> > >         // check broker registration and consumer registration
> > >         ZkClient zkClient = new ZkClient(zkServer.getConnectString());
> > >         boolean check1 = false;
> > >         boolean check2 = false;
> > >         boolean check3 = false;
> > >         for (int i = 0; i < 3; ++i) {
> > >             if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" +
> > > BROKER_ID1)) {
> > >                 Thread.sleep(1000);
> > >                 continue;
> > >             } else {
> > >                 check1 = true;
> > >             }
> > >             if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" +
> > > BROKER_ID2)) {
> > >                 Thread.sleep(1000);
> > >                 continue;
> > >             } else {
> > >                 check2 = true;
> > >             }
> > >             if (zkClient.getChildren(ZkUtils.ConsumersPath() + "/" +
> feed
> > > + "/ids").isEmpty()) {
> > >                 Thread.sleep(1000);
> > >                 continue;
> > >             } else {
> > >                 check3 = true;
> > >             }
> > >         }
> > >         assertTrue(check1 && check2 && check3);
> > >
> > >         generateDataToKafka(messageCount);
> > >         consumeDataFromkafka(messageCount);
> > >     }
> > >
> > >     public static void consumeDataFromkafka(int startNumber) throws
> > > IOException {
> > >         for (int i = 0; i < messageCount; ++i) {
> > >             Map<String, Object> obj =
> JsonMapper.getInstance().readValue(
> > >                     stream.iterator().next().message(),
> > >                     new TypeReference<Map<String, Object>>() {}
> > >             );
> > >             assertEquals(obj.get("messageId"), i + startNumber);
> > >             System.out.println("consumed:" + obj);
> > >         }
> > >     }
> > >
> > >     public static void createConsumer(Properties consumerProps) throws
> > > NoSuchFieldException, IllegalAccessException {
> > >         consumer = Consumer.createJavaConsumerConnector(new
> > > ConsumerConfig(consumerProps));
> > >
> > >         final Map<String, List<KafkaStream<byte[], byte[]>>> streams =
> > > consumer.createMessageStreams(ImmutableMap.of(feed, 1));
> > >
> > >         final List<KafkaStream<byte[], byte[]>> streamList =
> > > streams.get(feed);
> > >         if (streamList == null || streamList.size() != 1) {
> > >             fail();
> > >         }
> > >
> > >         stream = streamList.get(0);
> > >
> > >         Field f = consumer.getClass().getDeclaredField("underlying");
> > >         f.setAccessible(true);
> > >         ZookeeperConsumerConnector zkConsumerConnector =
> > > (ZookeeperConsumerConnector) f.get(consumer);
> > >
> > >         Field fk =
> > >
> >
> zkConsumerConnector.getClass().getDeclaredField("kafka$consumer$ZookeeperConsumerConnector$$zkClient");
> > >         fk.setAccessible(true);
> > >         ZkClient zkClient = (ZkClient) fk.get(zkConsumerConnector);
> > >         zkConsumer = getZk(zkClient);
> > >     }
> > > }
> > >
> > >
> > > Thank you
> > > Best, Jae
> > >
> >
>

Reply via email to