[ 
https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376515#comment-16376515
 ] 

Jason Gustafson edited comment on KAFKA-6052 at 2/26/18 8:27 AM:
-----------------------------------------------------------------

I had one admittedly far-fetched idea. The inter-broker send thread which is 
responsible for sending transaction markers does not block for connection 
establishment. The first time a request is sent to a node, the connection is 
initiated, but unless the connection is established extremely quickly, we'll 
just fail the request and depend on it being retried some time later after the 
connection has been established. The problem is that the request's completion 
handler is immediately invoked, which causes the request to be re-enqueued, and 
the selector to be woken up. I am wondering if the wakeup might be treated 
differently on windows: in particular, maybe it is shortcutting the select 
operation and not allowing the connection to establish. On the next iteration 
of the send thread's loop, the same thing would happen and we'd never be able 
to send the marker.

An easy way to test this theory is to disable the wakeup and switch to a busier 
poll model. I've done this here: 
https://github.com/hachikuji/kafka/commit/7bdb499b83b74e4c263458d0f1f90d68536fe7db.
 If any users who are seeing this bug can test this out, I'd appreciate it.


was (Author: hachikuji):
I had one admittedly far-fetched idea. The inter-broker send thread which is 
responsible for sending transaction markers does not block for connection 
establishment. The first time a request is sent to a node, the connection is 
initiated, but unless the connection is established extremely quickly, we'll 
just fail the request and depend on it being retried some time later after the 
connection has been established. The problem is that the request's completion 
handler is immediately invoked, which causes the request to be re-enqueued, and 
the selector to be woken up. I am wondering if the wakeup might be treated 
differently on windows: in particular, maybe it is shortcutting the select 
operation and not allowing the connection to establish. On the next iteration 
of the send thread's loop, the same thing would happen and we'd never be able 
to send the marker.

An easy way to test this theory is to disable the wakeup and switch to a busier 
poll model. I've done this here: 
[https://github.com/hachikuji/kafka/commit/7bdb499b83b74e4c263458d0f1f90d68536fe7db.]
 If any users who are seeing this bug can test this out, I'd appreciate it.

> Windows: Consumers not polling when isolation.level=read_committed 
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6052
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6052
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, producer 
>    Affects Versions: 0.11.0.0
>         Environment: Windows 10. All processes running in embedded mode.
>            Reporter: Ansel Zandegran
>            Priority: Major
>              Labels: windows
>         Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, 
> logFile.log
>
>
> *The same code is running fine in Linux.* I am trying to send a transactional 
> record with exactly once schematics. These are my producer, consumer and 
> broker setups. 
> public void sendWithTTemp(String topic, EHEvent event) {
>     Properties props = new Properties();
>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>               "localhost:9092,localhost:9093,localhost:9094");
> //    props.put("bootstrap.servers", 
> "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
>     props.put(ProducerConfig.ACKS_CONFIG, "all");
>     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
>     props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
>     props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
>     props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
>     props.put("transactional.id", "TID" + transactionId.incrementAndGet());
>     props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");
>     Producer<String, String> producer =
>         new KafkaProducer<>(props,
>                             new StringSerializer(),
>                             new StringSerializer());
>     Logger.log(this, "Initializing transaction...");
>     producer.initTransactions();
>     Logger.log(this, "Initializing done.");
>     try {
>       Logger.log(this, "Begin transaction...");
>       producer.beginTransaction();
>       Logger.log(this, "Begin transaction done.");
>       Logger.log(this, "Sending events...");
>       producer.send(new ProducerRecord<>(topic,
>                                          event.getKey().toString(),
>                                          event.getValue().toString()));
>       Logger.log(this, "Sending events done.");
>       Logger.log(this, "Committing...");
>       producer.commitTransaction();
>       Logger.log(this, "Committing done.");
>     } catch (ProducerFencedException | OutOfOrderSequenceException
>         | AuthorizationException e) {
>       producer.close();
>       e.printStackTrace();
>     } catch (KafkaException e) {
>       producer.abortTransaction();
>       e.printStackTrace();
>     }
>     producer.close();
>   }
> *In Consumer*
> I have set isolation.level=read_committed
> *In 3 Brokers*
> I'm running with the following properties
>       Properties props = new Properties();
>       props.setProperty("broker.id", "" + i);
>       props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
>       props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + 
> i);
>       props.setProperty("num.partitions", "1");
>       props.setProperty("zookeeper.connect", "localhost:2181");
>       props.setProperty("zookeeper.connection.timeout.ms", "6000");
>       props.setProperty("min.insync.replicas", "2");
>       props.setProperty("offsets.topic.replication.factor", "2");
>       props.setProperty("offsets.topic.num.partitions", "1");
>       props.setProperty("transaction.state.log.num.partitions", "2");
>       props.setProperty("transaction.state.log.replication.factor", "2");
>       props.setProperty("transaction.state.log.min.isr", "2");
> I am not getting any records in the consumer. When I set 
> isolation.level=read_uncommitted, I get the records. I assume that the 
> records are not getting commited. What could be the problem? log attached



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to