abbccdda commented on a change in pull request #9815: URL: https://github.com/apache/kafka/pull/9815#discussion_r553162016
########## File path: examples/src/main/java/kafka/examples/Consumer.java ########## @@ -26,33 +25,34 @@ import java.util.Collections; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.CountDownLatch; -public class Consumer extends ShutdownableThread { +public class Consumer implements Runnable { private final KafkaConsumer<Integer, String> consumer; private final String topic; private final String groupId; private final int numMessageToConsume; private int messageRemaining; - private final CountDownLatch latch; public Consumer(final String topic, final String groupId, final Optional<String> instanceId, final boolean readCommitted, final int numMessageToConsume, - final CountDownLatch latch) { - super("KafkaConsumerExample", false); + final boolean transactional) { this.groupId = groupId; Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id)); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + // if consuming as part of exactly-once processor, committing will be done by the producer Review comment: I'm not sure this is required here, in EOS consumer auto commit will not be successful right? ########## File path: examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java ########## @@ -88,40 +91,50 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc /* Stage 1: topic cleanup and recreation */ recreateTopics(numPartitions); - CountDownLatch prePopulateLatch = new CountDownLatch(1); /* Stage 2: pre-populate records */ - Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch); - producerThread.start(); - - if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) { - throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population"); + ExecutorService driver = Executors.newFixedThreadPool(2); // populating producer and validating consumer will run here + Producer producerTask = new Producer(INPUT_TOPIC, false, null, true, numRecords, -1); + try { + CompletableFuture.runAsync(producerTask, driver).get(5, TimeUnit.MINUTES); + } catch (TimeoutException e) { + System.out.println("Timeout after 5 minutes waiting for data pre-population"); + Exit.exit(1); } - CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances); + /* Stage 3: transactionally process all messages */ + ExecutorService processorThreads = Executors.newFixedThreadPool(numInstances); + CompletableFuture[] messageProcessors = new CompletableFuture[numInstances]; for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) { - ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor( - INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, transactionalCopyLatch); - messageProcessor.start(); + messageProcessors[instanceIdx] = CompletableFuture.runAsync( + new ExactlyOnceMessageProcessor(INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx), + processorThreads); } - if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) { - throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy"); + try { + CompletableFuture.allOf(messageProcessors).get(5, TimeUnit.MINUTES); + } catch (TimeoutException e) { + System.out.println("Timeout after 5 minutes waiting for transactional message copy"); + Exit.exit(1); } - CountDownLatch consumeLatch = new CountDownLatch(1); - - /* Stage 4: consume all processed messages to verify exactly once */ - Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch); - consumerThread.start(); + /* Stage 4: consume all processed messages to verify exactly once. + Consumer uses read committed to guarantee that uncommitted events will not be included in verification Review comment: s/Consumer/The verifying consumer ########## File path: examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java ########## @@ -88,40 +91,50 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc /* Stage 1: topic cleanup and recreation */ recreateTopics(numPartitions); - CountDownLatch prePopulateLatch = new CountDownLatch(1); /* Stage 2: pre-populate records */ - Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch); - producerThread.start(); - - if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) { - throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population"); + ExecutorService driver = Executors.newFixedThreadPool(2); // populating producer and validating consumer will run here + Producer producerTask = new Producer(INPUT_TOPIC, false, null, true, numRecords, -1); + try { + CompletableFuture.runAsync(producerTask, driver).get(5, TimeUnit.MINUTES); + } catch (TimeoutException e) { + System.out.println("Timeout after 5 minutes waiting for data pre-population"); + Exit.exit(1); } - CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances); + /* Stage 3: transactionally process all messages */ + ExecutorService processorThreads = Executors.newFixedThreadPool(numInstances); + CompletableFuture[] messageProcessors = new CompletableFuture[numInstances]; for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) { - ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor( - INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, transactionalCopyLatch); - messageProcessor.start(); + messageProcessors[instanceIdx] = CompletableFuture.runAsync( + new ExactlyOnceMessageProcessor(INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx), + processorThreads); } - if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) { - throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy"); + try { + CompletableFuture.allOf(messageProcessors).get(5, TimeUnit.MINUTES); + } catch (TimeoutException e) { + System.out.println("Timeout after 5 minutes waiting for transactional message copy"); + Exit.exit(1); } - CountDownLatch consumeLatch = new CountDownLatch(1); - - /* Stage 4: consume all processed messages to verify exactly once */ - Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch); - consumerThread.start(); + /* Stage 4: consume all processed messages to verify exactly once. + Consumer uses read committed to guarantee that uncommitted events will not be included in verification + but the consumer is not part of the transaction itself Review comment: while the verifying consumer itself is not part of the transaction ########## File path: examples/src/main/java/kafka/examples/Consumer.java ########## @@ -62,34 +62,24 @@ public Consumer(final String topic, this.topic = topic; this.numMessageToConsume = numMessageToConsume; this.messageRemaining = numMessageToConsume; - this.latch = latch; } KafkaConsumer<Integer, String> get() { return consumer; } @Override - public void doWork() { + public void run() { consumer.subscribe(Collections.singletonList(this.topic)); - ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1)); - for (ConsumerRecord<Integer, String> record : records) { - System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); + while (!Thread.currentThread().isInterrupted() && messageRemaining > 0) { Review comment: So previously this example only runs once? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org