Hello. I still could not progress in this issue.
As per Jay Kreps recent email in thread 'delay of producer and consumer in kafka 0.9 is too big to be accepted', I will do the *TestEndToEndLatency *test to see my latency. But, besides that, am I doing something wrong in the code below? Thanks & regards, Rodrigo ---------- Forwarded message ---------- From: Rodrigo Ottero <ott...@gmail.com> Date: Fri, Jun 17, 2016 at 5:33 PM Subject: Messages delayed in jUnit test (version 0.9.0.0) To: users@kafka.apache.org Hi. I am trying to use an embedded Kafka server to allow me to create tests in jUnit using a real Kafka implementation, instead of a stub or a mock. I am using Kafka version 0.9.0.0. The test works, but the consumer poll has to wait for 3 seconds to get the message. Here is the code I am running: ---------- // given a Kafka producer, a kafka consumer, a topic and a message final Producer<String, String> producer = createKafkaProducer(); final KafkaConsumer<String, String> consumer = createKafkaConsumer(topicName); // when the producer publishes the message to the topic producer.send(new ProducerRecord<String, String>(topicName, message)); // then the consumer can read it from the topic final ConsumerRecords<String, String> records = consumer.poll(3000); assertTrue("The topic should have only one message. The number of messages found is: " + records.count(), records.count() == 1); consumer.close(); ---------- For this to work, consumer.poll(...) must be 3 seconds to get the message, otherwise the consumer will get an empty ConsumerRecords ( records.count() == 0). I tried sleeping the execution for 10 seconds before reaching the consumer, to see if it was some delay caused by the server startup, but it did not change the poll's time. However, if I use the kafka-console-consumer.bat present in Kafka installation and point it to my embedded server, it reads it almost immediately. Here is the configuration used in the test's consumer: ---------- final Properties props = new Properties(); props.put("bootstrap.servers", kafkaServerAndPort); props.put("group.id", "anyGroupId"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName)); ---------- Would anyone have some idea why it is taking 3 seconds for the poll to grab the message? Thanks in advance! PS: here is part of the code used to startup the Kafka and Zookeeper servers: ----------- public EmbeddedKafkaAndZookeeperServers() throws Exception { removeExistingLogFiles(); startZookeeperServer(); startKafkaServer(); } private void startKafkaServer() throws IOException { final KafkaConfig config = getKafkaConfig(zookeeperServer.getConnectString()); kafkaServer = new KafkaServerStartable(config); kafkaServer.startup(); } private void startZookeeperServer() throws Exception { final int shouldUseARandomPort = -1; final File temporaryDirectory = new File(ZOOKEEPER_LOGS_DIRECTORY); final boolean shouldStartImmediately = true; zookeeperServer = new TestingServer(shouldUseARandomPort, temporaryDirectory, shouldStartImmediately); } private static KafkaConfig getKafkaConfig(final String zookeeperConnectString) throws IOException { final Properties props = new Properties(); props.put("broker.id", BROKER_ID); props.put("port", BROKER_PORT); props.put("log.dir", KAFKA_LOGS_DIRECTORY); props.put("zookeeper.connect", zookeeperConnectString); props.put("host.name", "127.0.0.1"); props.put("auto.create.topics.enable", "true"); return new KafkaConfig(props); } -----------