Darya Merkureva created KAFKA-8380: -------------------------------------- Summary: I can not create a topic, immediately write to it and then read. Key: KAFKA-8380 URL: https://issues.apache.org/jira/browse/KAFKA-8380 Project: Kafka Issue Type: Bug Affects Versions: 2.2.0 Reporter: Darya Merkureva
We are trying to create a topic, immediately write to it and read. For some reason, we read nothing in spite of the fact that we are waiting for the completion of KafkaFuture. {code:java} public class main { private static final String TOPIC_NAME = "topic"; private static final String KEY_NAME = "key"; public static void main(String[] args) { final Properties prodProps = new Properties(); prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); prodProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000); prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final Producer<String, String> prod = new KafkaProducer<>(prodProps); final Properties admProps = new Properties(); admProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); final AdminClient adm = KafkaAdminClient.create(admProps); final Properties consProps = new Properties(); consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consProps.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); final Consumer<String,String> cons = new KafkaConsumer<>(consProps); try { final NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, (short)1); val createTopicsResult = adm.createTopics(Collections.singleton(newTopic)); createTopicsResult.values().get(TOPIC_NAME).get(); } catch (InterruptedException | ExecutionException e) { if (!(e.getCause() instanceof TopicExistsException)) { throw new RuntimeException(e.getMessage(), e); } } final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, KEY_NAME, "data"); prod.send(producerRecord); prod.send(producerRecord); prod.send(producerRecord); prod.send(producerRecord); cons.subscribe(Arrays.asList(TOPIC_NAME)); val records = cons.poll(Duration.ofSeconds(10)); for(var record: records){ System.out.println(record.value()); } } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)