That makes sense. Thank you.
On Thu, Mar 27, 2014 at 8:32 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > When I call consumer.commitOffsets(); before killing session, unit test > succeeded. This problem would happen only with autoCommit enabled > > That seems expected. If you call commitOffsets() explicitly before > simulating a GC pause on the consumer, there will be no duplicates since > the next consumer instance that picks up the same partitions will start > reading from the last checkpointed offset. On the other hand, autoCommit > will only commit at a certain interval. So if you decide to pause the > consumer between 2 intervals, then it will replay data since the last > interval. > > Thanks, > Neha > > > On Thu, Mar 27, 2014 at 4:21 PM, Bae, Jae Hyeon <metac...@gmail.com> > wrote: > > > When I call consumer.commitOffsets(); before killing session, unit test > > succeeded. This problem would happen only with autoCommit enabled. > > > > Could you fix this problem before releasing 0.8.1.1? > > > > Thank you > > Best, Jae > > > > > > On Thu, Mar 27, 2014 at 3:57 PM, Bae, Jae Hyeon <metac...@gmail.com> > > wrote: > > > > > Hi > > > > > > While testing kafka 0.8 consumer's zk resilience, I found that on the > zk > > > session kill and handleNewSession() is called, high level consumer is > > > replaying messages. > > > > > > Is this know issue? I am attaching unit test source code. > > > > > > package com.netflix.nfkafka.zktest; > > > > > > import com.fasterxml.jackson.core.JsonProcessingException; > > > import com.google.common.collect.ImmutableMap; > > > import com.google.common.collect.Lists; > > > import com.netflix.logging.chukwa.JsonMapper; > > > import kafka.consumer.Consumer; > > > import kafka.consumer.ConsumerConfig; > > > import kafka.consumer.KafkaStream; > > > import kafka.consumer.ZookeeperConsumerConnector; > > > import kafka.javaapi.consumer.ConsumerConnector; > > > import kafka.javaapi.producer.Producer; > > > import kafka.producer.KeyedMessage; > > > import kafka.producer.ProducerConfig; > > > import kafka.server.KafkaConfig; > > > import kafka.server.KafkaServer; > > > import kafka.utils.ZkUtils; > > > import org.I0Itec.zkclient.IDefaultNameSpace; > > > import org.I0Itec.zkclient.ZkClient; > > > import org.I0Itec.zkclient.ZkConnection; > > > import org.I0Itec.zkclient.ZkServer; > > > import org.apache.commons.io.FileUtils; > > > import org.apache.commons.lang.StringUtils; > > > import org.apache.curator.test.KillSession; > > > import org.apache.curator.test.TestingServer; > > > import org.apache.zookeeper.ZKUtil; > > > import org.apache.zookeeper.ZooKeeper; > > > import org.codehaus.jackson.type.TypeReference; > > > import org.junit.AfterClass; > > > import org.junit.BeforeClass; > > > import org.junit.Test; > > > > > > import java.io.File; > > > import java.io.IOException; > > > import java.lang.reflect.Field; > > > import java.util.List; > > > import java.util.Map; > > > import java.util.Properties; > > > import java.util.Random; > > > > > > import static org.junit.Assert.assertEquals; > > > import static org.junit.Assert.assertTrue; > > > import static org.junit.Assert.fail; > > > > > > public class TestZkSessionKill { > > > private static TestingServer zkServer; > > > private static KafkaServer server1; > > > private static KafkaServer server2; > > > private static KafkaConfig config1; > > > private static KafkaConfig config2; > > > private static ZooKeeper zk1; > > > private static ZooKeeper zk2; > > > > > > private static ConsumerConnector consumer; > > > private static ZooKeeper zkConsumer; > > > private static KafkaStream<byte[], byte[]> stream; > > > > > > private static final int BROKER_ID1 = 0; > > > private static final int BROKER_ID2 = 1; > > > private static final int KAFKA_PORT1 = 2200; > > > private static final int KAFKA_PORT2 = 2201; > > > > > > public static String feed = "testzksessionkill"; > > > > > > @BeforeClass > > > public static void setup() throws Exception { > > > zkServer = new TestingServer(-1, tempDir()); > > > config1 = new KafkaConfig(createBrokerConfig(BROKER_ID1, > > > KAFKA_PORT1)); > > > server1 = createServer(config1); > > > zk1 = getZk(server1.zkClient()); > > > > > > config2 = new KafkaConfig(createBrokerConfig(BROKER_ID2, > > > KAFKA_PORT2)); > > > server2 = createServer(config2); > > > zk2 = getZk(server2.zkClient()); > > > > > > Properties props = new Properties(); > > > props.setProperty("zookeeper.connect", > > > zkServer.getConnectString()); > > > props.setProperty("group.id", feed); > > > props.setProperty("auto.offset.reset", "smallest"); > > > > > > generateDataToKafka(0); // initially we have to create the > topic > > > > > > createConsumer(props); > > > } > > > > > > public static Properties createBrokerConfig(int nodeId, int port) { > > > Properties props = new Properties(); > > > props.put("broker.id", > > > Integer.toString(nodeId)); > > > props.put("brokerId", > > Integer.toString(nodeId)); > > > props.put("host.name", "localhost"); > > > props.put("port", > Integer.toString(port)); > > > props.put("log.dir", > > > tempDir().getAbsolutePath()); > > > props.put("log.flush.interval.messages", "1"); > > > props.put("zookeeper.connect", > > > zkServer.getConnectString()); > > > props.put("replica.socket.timeout.ms", "1500"); > > > props.put("hostName", "localhost"); > > > props.put("numPartitions", "1"); > > > > > > return props; > > > } > > > > > > public static File tempDir() { > > > File f = new File("./build/test", "kafka-" + new > > > Random().nextInt(1000000)); > > > f.mkdirs(); > > > System.out.println(f); > > > f.deleteOnExit(); > > > return f; > > > } > > > > > > @AfterClass > > > public static void shutdown() throws IOException { > > > if (server1 != null) { > > > server1.shutdown(); > > > server1.awaitShutdown(); > > > } > > > > > > if (server2 != null) { > > > server2.shutdown(); > > > server2.awaitShutdown(); > > > } > > > > > > zkServer.close(); > > > } > > > > > > public static KafkaServer createServer(KafkaConfig config) throws > > > NoSuchFieldException, IllegalAccessException { > > > KafkaServer server = new KafkaServer(config, > > > kafka.utils.SystemTime$.MODULE$); > > > server.startup(); > > > return server; > > > } > > > > > > public static ZooKeeper getZk(ZkClient zkClient) throws > > > NoSuchFieldException, IllegalAccessException { > > > Field f = zkClient.getClass().getDeclaredField("_connection"); > > > f.setAccessible(true); > > > ZkConnection zkConnection = (ZkConnection) f.get(zkClient); > > > Field fk = zkConnection.getClass().getDeclaredField("_zk"); > > > fk.setAccessible(true); > > > return (ZooKeeper) fk.get(zkConnection); > > > } > > > > > > > > > public static int messageCount = 25; > > > > > > public static void generateDataToKafka(int startNumber) throws > > > IOException { > > > Properties props = new Properties(); > > > props.setProperty("metadata.broker.list", getBrokerListStr()); > > > props.setProperty("producer.discovery", "static"); > > > props.setProperty("request.required.acks", "0"); > > > props.setProperty("producer.type", "sync"); > > > > > > Producer<byte[], byte[]> producer = new Producer<byte[], > > > byte[]>(new ProducerConfig(props)); > > > for (int i = 0; i < messageCount; ++i) { > > > Map<String, Object> map = ImmutableMap.<String, > > > Object>builder() > > > .put("messageId", (i + startNumber)) > > > .put("ts", System.currentTimeMillis()) > > > .build(); > > > > > > try { > > > producer.send(new KeyedMessage<byte[], byte[]>(feed, > > > JsonMapper.getInstance().writeValueAsBytes(map))); > > > System.out.println("sent: " + map); > > > } catch (JsonProcessingException e) { > > > fail(); > > > } > > > } > > > producer.close(); > > > } > > > > > > public static String getBrokerListStr() { > > > List<String> str = Lists.newArrayList(); > > > for (KafkaConfig config : Lists.newArrayList(config1, > config2)) { > > > str.add(config.hostName() + ":" + config.port()); > > > } > > > return StringUtils.join(str, ","); > > > } > > > > > > @Test > > > public void test() throws Exception { > > > consumeDataFromkafka(0); > > > > > > // kill sessions > > > //KillSession.kill(zk1, zkServer.getConnectString()); > > > //KillSession.kill(zk2, zkServer.getConnectString()); > > > KillSession.kill(zkConsumer, zkServer.getConnectString()); > > > > > > // check broker registration and consumer registration > > > ZkClient zkClient = new ZkClient(zkServer.getConnectString()); > > > boolean check1 = false; > > > boolean check2 = false; > > > boolean check3 = false; > > > for (int i = 0; i < 3; ++i) { > > > if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" + > > > BROKER_ID1)) { > > > Thread.sleep(1000); > > > continue; > > > } else { > > > check1 = true; > > > } > > > if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" + > > > BROKER_ID2)) { > > > Thread.sleep(1000); > > > continue; > > > } else { > > > check2 = true; > > > } > > > if (zkClient.getChildren(ZkUtils.ConsumersPath() + "/" + > feed > > > + "/ids").isEmpty()) { > > > Thread.sleep(1000); > > > continue; > > > } else { > > > check3 = true; > > > } > > > } > > > assertTrue(check1 && check2 && check3); > > > > > > generateDataToKafka(messageCount); > > > consumeDataFromkafka(messageCount); > > > } > > > > > > public static void consumeDataFromkafka(int startNumber) throws > > > IOException { > > > for (int i = 0; i < messageCount; ++i) { > > > Map<String, Object> obj = > JsonMapper.getInstance().readValue( > > > stream.iterator().next().message(), > > > new TypeReference<Map<String, Object>>() {} > > > ); > > > assertEquals(obj.get("messageId"), i + startNumber); > > > System.out.println("consumed:" + obj); > > > } > > > } > > > > > > public static void createConsumer(Properties consumerProps) throws > > > NoSuchFieldException, IllegalAccessException { > > > consumer = Consumer.createJavaConsumerConnector(new > > > ConsumerConfig(consumerProps)); > > > > > > final Map<String, List<KafkaStream<byte[], byte[]>>> streams = > > > consumer.createMessageStreams(ImmutableMap.of(feed, 1)); > > > > > > final List<KafkaStream<byte[], byte[]>> streamList = > > > streams.get(feed); > > > if (streamList == null || streamList.size() != 1) { > > > fail(); > > > } > > > > > > stream = streamList.get(0); > > > > > > Field f = consumer.getClass().getDeclaredField("underlying"); > > > f.setAccessible(true); > > > ZookeeperConsumerConnector zkConsumerConnector = > > > (ZookeeperConsumerConnector) f.get(consumer); > > > > > > Field fk = > > > > > > zkConsumerConnector.getClass().getDeclaredField("kafka$consumer$ZookeeperConsumerConnector$$zkClient"); > > > fk.setAccessible(true); > > > ZkClient zkClient = (ZkClient) fk.get(zkConsumerConnector); > > > zkConsumer = getZk(zkClient); > > > } > > > } > > > > > > > > > Thank you > > > Best, Jae > > > > > >