Ansel Zandegran created KAFKA-6052: -------------------------------------- Summary: Consumers not polling when isolation.level=read_committed Key: KAFKA-6052 URL: https://issues.apache.org/jira/browse/KAFKA-6052 Project: Kafka Issue Type: Bug Components: consumer, producer Affects Versions: 0.11.0.0 Environment: Windows 10. All processes running in embedded mode Reporter: Ansel Zandegran Attachments: logFile.log
I am trying to send a transactional record with exactly once schematics. These are my producer, consumer and broker setups. public void sendWithTTemp(String topic, EHEvent event) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); // props.put("bootstrap.servers", "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put("transactional.id", "TID" + transactionId.incrementAndGet()); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); Logger.log(this, "Initializing transaction..."); producer.initTransactions(); Logger.log(this, "Initializing done."); try { Logger.log(this, "Begin transaction..."); producer.beginTransaction(); Logger.log(this, "Begin transaction done."); Logger.log(this, "Sending events..."); producer.send(new ProducerRecord<>(topic, event.getKey().toString(), event.getValue().toString())); Logger.log(this, "Sending events done."); Logger.log(this, "Committing..."); producer.commitTransaction(); Logger.log(this, "Committing done."); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { producer.close(); e.printStackTrace(); } catch (KafkaException e) { producer.abortTransaction(); e.printStackTrace(); } producer.close(); } *In Consumer* I have set isolation.level=read_committed *In 3 Brokers* I'm running with the following properties Properties props = new Properties(); props.setProperty("broker.id", "" + i); props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + i); props.setProperty("num.partitions", "1"); props.setProperty("zookeeper.connect", "localhost:2181"); props.setProperty("zookeeper.connection.timeout.ms", "6000"); props.setProperty("min.insync.replicas", "2"); props.setProperty("offsets.topic.replication.factor", "2"); props.setProperty("offsets.topic.num.partitions", "1"); props.setProperty("transaction.state.log.num.partitions", "2"); props.setProperty("transaction.state.log.replication.factor", "2"); props.setProperty("transaction.state.log.min.isr", "2"); I am not getting any records in the consumer. When I set isolation.level=read_uncommitted, I get the records. I assume that the records are not getting commited. What could be the problem? log attached -- This message was sent by Atlassian JIRA (v6.4.14#64029)