Ansel Zandegran created KAFKA-6052:
--------------------------------------

             Summary: Consumers not polling when isolation.level=read_committed
                 Key: KAFKA-6052
                 URL: https://issues.apache.org/jira/browse/KAFKA-6052
             Project: Kafka
          Issue Type: Bug
          Components: consumer, producer 
    Affects Versions: 0.11.0.0
         Environment: Windows 10. All processes running in embedded mode
            Reporter: Ansel Zandegran
         Attachments: logFile.log
I am trying to send a transactional record with exactly once schematics. These 
are my producer, consumer and broker setups. 
public void sendWithTTemp(String topic, EHEvent event) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
              "localhost:9092,localhost:9093,localhost:9094");
//    props.put("bootstrap.servers", 
"34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put("transactional.id", "TID" + transactionId.incrementAndGet());
    props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");

    Producer<String, String> producer =
        new KafkaProducer<>(props,
                            new StringSerializer(),
                            new StringSerializer());

    Logger.log(this, "Initializing transaction...");

    producer.initTransactions();

    Logger.log(this, "Initializing done.");

    try {
      Logger.log(this, "Begin transaction...");
      producer.beginTransaction();
      Logger.log(this, "Begin transaction done.");
      Logger.log(this, "Sending events...");
      producer.send(new ProducerRecord<>(topic,
                                         event.getKey().toString(),
                                         event.getValue().toString()));
      Logger.log(this, "Sending events done.");
      Logger.log(this, "Committing...");
      producer.commitTransaction();
      Logger.log(this, "Committing done.");
    } catch (ProducerFencedException | OutOfOrderSequenceException
        | AuthorizationException e) {
      producer.close();
      e.printStackTrace();
    } catch (KafkaException e) {
      producer.abortTransaction();
      e.printStackTrace();
    }

    producer.close();
  }

*In Consumer*
I have set isolation.level=read_committed
*In 3 Brokers*
I'm running with the following properties
      Properties props = new Properties();
      props.setProperty("broker.id", "" + i);
      props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
      props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + i);
      props.setProperty("num.partitions", "1");
      props.setProperty("zookeeper.connect", "localhost:2181");
      props.setProperty("zookeeper.connection.timeout.ms", "6000");
      props.setProperty("min.insync.replicas", "2");
      props.setProperty("offsets.topic.replication.factor", "2");
      props.setProperty("offsets.topic.num.partitions", "1");
      props.setProperty("transaction.state.log.num.partitions", "2");
      props.setProperty("transaction.state.log.replication.factor", "2");
      props.setProperty("transaction.state.log.min.isr", "2");

I am not getting any records in the consumer. When I set 
isolation.level=read_uncommitted, I get the records. I assume that the records 
are not getting commited. What could be the problem? log attached



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to