abbccdda commented on a change in pull request #9815:
URL: https://github.com/apache/kafka/pull/9815#discussion_r553162016



##########
File path: examples/src/main/java/kafka/examples/Consumer.java
##########
@@ -26,33 +25,34 @@
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
 
-public class Consumer extends ShutdownableThread {
+public class Consumer implements Runnable {
     private final KafkaConsumer<Integer, String> consumer;
     private final String topic;
     private final String groupId;
     private final int numMessageToConsume;
     private int messageRemaining;
-    private final CountDownLatch latch;
 
     public Consumer(final String topic,
                     final String groupId,
                     final Optional<String> instanceId,
                     final boolean readCommitted,
                     final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample", false);
+                    final boolean transactional) {
         this.groupId = groupId;
         Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         instanceId.ifPresent(id -> 
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.IntegerDeserializer");
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        // if consuming as part of exactly-once processor, committing will be 
done by the producer

Review comment:
       I'm not sure this is required here, in EOS consumer auto commit will not 
be successful right? 

##########
File path: examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
##########
@@ -88,40 +91,50 @@ public static void main(String[] args) throws 
InterruptedException, ExecutionExc
         /* Stage 1: topic cleanup and recreation */
         recreateTopics(numPartitions);
 
-        CountDownLatch prePopulateLatch = new CountDownLatch(1);
 
         /* Stage 2: pre-populate records */
-        Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, 
numRecords, -1, prePopulateLatch);
-        producerThread.start();
-
-        if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for 
data pre-population");
+        ExecutorService driver = Executors.newFixedThreadPool(2); // 
populating producer and validating consumer will run here
+        Producer producerTask = new Producer(INPUT_TOPIC, false, null, true, 
numRecords, -1);
+        try {
+            CompletableFuture.runAsync(producerTask, driver).get(5, 
TimeUnit.MINUTES);
+        } catch (TimeoutException e) {
+            System.out.println("Timeout after 5 minutes waiting for data 
pre-population");
+            Exit.exit(1);
         }
 
-        CountDownLatch transactionalCopyLatch = new 
CountDownLatch(numInstances);
+
 
         /* Stage 3: transactionally process all messages */
+        ExecutorService processorThreads = 
Executors.newFixedThreadPool(numInstances);
+        CompletableFuture[] messageProcessors = new 
CompletableFuture[numInstances];
         for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
-            ExactlyOnceMessageProcessor messageProcessor = new 
ExactlyOnceMessageProcessor(
-                INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, 
transactionalCopyLatch);
-            messageProcessor.start();
+            messageProcessors[instanceIdx] = CompletableFuture.runAsync(
+                    new ExactlyOnceMessageProcessor(INPUT_TOPIC, OUTPUT_TOPIC, 
instanceIdx),
+                    processorThreads);
         }
 
-        if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for 
transactionally message copy");
+        try {
+            CompletableFuture.allOf(messageProcessors).get(5, 
TimeUnit.MINUTES);
+        } catch (TimeoutException e) {
+            System.out.println("Timeout after 5 minutes waiting for 
transactional message copy");
+            Exit.exit(1);
         }
 
-        CountDownLatch consumeLatch = new CountDownLatch(1);
-
-        /* Stage 4: consume all processed messages to verify exactly once */
-        Consumer consumerThread = new Consumer(OUTPUT_TOPIC, 
"Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
-        consumerThread.start();
+        /* Stage 4: consume all processed messages to verify exactly once.
+        Consumer uses read committed to guarantee that uncommitted events will 
not be included in verification

Review comment:
       s/Consumer/The verifying consumer

##########
File path: examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
##########
@@ -88,40 +91,50 @@ public static void main(String[] args) throws 
InterruptedException, ExecutionExc
         /* Stage 1: topic cleanup and recreation */
         recreateTopics(numPartitions);
 
-        CountDownLatch prePopulateLatch = new CountDownLatch(1);
 
         /* Stage 2: pre-populate records */
-        Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, 
numRecords, -1, prePopulateLatch);
-        producerThread.start();
-
-        if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for 
data pre-population");
+        ExecutorService driver = Executors.newFixedThreadPool(2); // 
populating producer and validating consumer will run here
+        Producer producerTask = new Producer(INPUT_TOPIC, false, null, true, 
numRecords, -1);
+        try {
+            CompletableFuture.runAsync(producerTask, driver).get(5, 
TimeUnit.MINUTES);
+        } catch (TimeoutException e) {
+            System.out.println("Timeout after 5 minutes waiting for data 
pre-population");
+            Exit.exit(1);
         }
 
-        CountDownLatch transactionalCopyLatch = new 
CountDownLatch(numInstances);
+
 
         /* Stage 3: transactionally process all messages */
+        ExecutorService processorThreads = 
Executors.newFixedThreadPool(numInstances);
+        CompletableFuture[] messageProcessors = new 
CompletableFuture[numInstances];
         for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
-            ExactlyOnceMessageProcessor messageProcessor = new 
ExactlyOnceMessageProcessor(
-                INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, 
transactionalCopyLatch);
-            messageProcessor.start();
+            messageProcessors[instanceIdx] = CompletableFuture.runAsync(
+                    new ExactlyOnceMessageProcessor(INPUT_TOPIC, OUTPUT_TOPIC, 
instanceIdx),
+                    processorThreads);
         }
 
-        if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for 
transactionally message copy");
+        try {
+            CompletableFuture.allOf(messageProcessors).get(5, 
TimeUnit.MINUTES);
+        } catch (TimeoutException e) {
+            System.out.println("Timeout after 5 minutes waiting for 
transactional message copy");
+            Exit.exit(1);
         }
 
-        CountDownLatch consumeLatch = new CountDownLatch(1);
-
-        /* Stage 4: consume all processed messages to verify exactly once */
-        Consumer consumerThread = new Consumer(OUTPUT_TOPIC, 
"Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
-        consumerThread.start();
+        /* Stage 4: consume all processed messages to verify exactly once.
+        Consumer uses read committed to guarantee that uncommitted events will 
not be included in verification
+        but the consumer is not part of the transaction itself

Review comment:
       while the verifying consumer itself is not part of the transaction

##########
File path: examples/src/main/java/kafka/examples/Consumer.java
##########
@@ -62,34 +62,24 @@ public Consumer(final String topic,
         this.topic = topic;
         this.numMessageToConsume = numMessageToConsume;
         this.messageRemaining = numMessageToConsume;
-        this.latch = latch;
     }
 
     KafkaConsumer<Integer, String> get() {
         return consumer;
     }
 
     @Override
-    public void doWork() {
+    public void run() {
         consumer.subscribe(Collections.singletonList(this.topic));
-        ConsumerRecords<Integer, String> records = 
consumer.poll(Duration.ofSeconds(1));
-        for (ConsumerRecord<Integer, String> record : records) {
-            System.out.println(groupId + " received message : from partition " 
+ record.partition() + ", (" + record.key() + ", " + record.value() + ") at 
offset " + record.offset());
+        while (!Thread.currentThread().isInterrupted() &&  messageRemaining > 
0) {

Review comment:
       So previously this example only runs once?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to