artemlivshits commented on code in PR #15913:
URL: https://github.com/apache/kafka/pull/15913#discussion_r1599253246


##########
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##########
@@ -0,0 +1,148 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.AbstractMap;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+public class TransactionalClientDemo {
+
+    private static final String CONSUMER_GROUP_ID = "my-group-id";
+    private static final String OUTPUT_TOPIC = "output";
+    private static final String INPUT_TOPIC = "input";
+    private static KafkaConsumer<String, String> consumer;
+    private static KafkaProducer<String, String> producer;
+
+    public static void main(String[] args) {
+        initializeApplication();
+
+        boolean isRunning = true;
+        // Continuously poll for records
+        while(isRunning) {
+            try {
+                try {
+                    // Poll records from Kafka for a timeout of 60 seconds
+                    ConsumerRecords<String, String> records = 
consumer.poll(ofSeconds(60));
+
+                    // Process records to generate word count map
+                    Map<String, Integer> wordCountMap = records.records(new 
TopicPartition(INPUT_TOPIC, 0))

Review Comment:
   Not related to the point of the example, but why we only look for partition 
0?  



##########
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##########
@@ -0,0 +1,148 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.AbstractMap;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+public class TransactionalClientDemo {
+
+    private static final String CONSUMER_GROUP_ID = "my-group-id";
+    private static final String OUTPUT_TOPIC = "output";
+    private static final String INPUT_TOPIC = "input";
+    private static KafkaConsumer<String, String> consumer;
+    private static KafkaProducer<String, String> producer;
+
+    public static void main(String[] args) {
+        initializeApplication();
+
+        boolean isRunning = true;
+        // Continuously poll for records
+        while(isRunning) {
+            try {
+                try {
+                    // Poll records from Kafka for a timeout of 60 seconds
+                    ConsumerRecords<String, String> records = 
consumer.poll(ofSeconds(60));
+
+                    // Process records to generate word count map
+                    Map<String, Integer> wordCountMap = records.records(new 
TopicPartition(INPUT_TOPIC, 0))
+                            .stream()
+                            .flatMap(record -> 
Stream.of(record.value().split(" ")))
+                            .map(word -> new AbstractMap.SimpleEntry<>(word, 
1))
+                            .collect(Collectors.toMap(
+                                    AbstractMap.SimpleEntry::getKey,
+                                    AbstractMap.SimpleEntry::getValue,
+                                    (v1, v2) -> v1 + v2
+                            ));
+
+                    // Begin transaction
+                    producer.beginTransaction();
+
+                    // Produce word count results to output topic
+                    wordCountMap.forEach((key, value) ->
+                            producer.send(new ProducerRecord<>(OUTPUT_TOPIC, 
key, value.toString())));
+
+                    // Determine offsets to commit
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
new HashMap<>();
+                    for (TopicPartition partition : records.partitions()) {
+                        List<ConsumerRecord<String, String>> 
partitionedRecords = records.records(partition);
+                        long offset = 
partitionedRecords.get(partitionedRecords.size() - 1).offset();
+                        offsetsToCommit.put(partition, new 
OffsetAndMetadata(offset + 1));
+                    }
+
+                    // Send offsets to transaction for atomic commit
+                    producer.sendOffsetsToTransaction(offsetsToCommit, 
CONSUMER_GROUP_ID);
+
+                    // Commit transaction
+                    producer.commitTransaction();
+                } catch (AbortableTransactionException e) {
+                    // Abortable Exception: Handle Kafka exception by aborting 
transaction. Abortable Exception should never be thrown.

Review Comment:
   "Abortable Exception should never be thrown."  -- is the desired message 
that the abortTransaction never throws abortable exceptions?



##########
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##########
@@ -0,0 +1,148 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.AbstractMap;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+public class TransactionalClientDemo {
+
+    private static final String CONSUMER_GROUP_ID = "my-group-id";
+    private static final String OUTPUT_TOPIC = "output";
+    private static final String INPUT_TOPIC = "input";
+    private static KafkaConsumer<String, String> consumer;
+    private static KafkaProducer<String, String> producer;
+
+    public static void main(String[] args) {
+        initializeApplication();
+
+        boolean isRunning = true;
+        // Continuously poll for records
+        while(isRunning) {
+            try {
+                try {
+                    // Poll records from Kafka for a timeout of 60 seconds
+                    ConsumerRecords<String, String> records = 
consumer.poll(ofSeconds(60));
+
+                    // Process records to generate word count map
+                    Map<String, Integer> wordCountMap = records.records(new 
TopicPartition(INPUT_TOPIC, 0))
+                            .stream()
+                            .flatMap(record -> 
Stream.of(record.value().split(" ")))
+                            .map(word -> new AbstractMap.SimpleEntry<>(word, 
1))
+                            .collect(Collectors.toMap(
+                                    AbstractMap.SimpleEntry::getKey,
+                                    AbstractMap.SimpleEntry::getValue,
+                                    (v1, v2) -> v1 + v2
+                            ));
+
+                    // Begin transaction
+                    producer.beginTransaction();
+
+                    // Produce word count results to output topic
+                    wordCountMap.forEach((key, value) ->
+                            producer.send(new ProducerRecord<>(OUTPUT_TOPIC, 
key, value.toString())));
+
+                    // Determine offsets to commit
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
new HashMap<>();
+                    for (TopicPartition partition : records.partitions()) {
+                        List<ConsumerRecord<String, String>> 
partitionedRecords = records.records(partition);
+                        long offset = 
partitionedRecords.get(partitionedRecords.size() - 1).offset();
+                        offsetsToCommit.put(partition, new 
OffsetAndMetadata(offset + 1));
+                    }
+
+                    // Send offsets to transaction for atomic commit
+                    producer.sendOffsetsToTransaction(offsetsToCommit, 
CONSUMER_GROUP_ID);
+
+                    // Commit transaction
+                    producer.commitTransaction();
+                } catch (AbortableTransactionException e) {
+                    // Abortable Exception: Handle Kafka exception by aborting 
transaction. Abortable Exception should never be thrown.
+                    producer.abortTransaction();
+                    resetToLastCommittedPositions(consumer);
+                }
+            } catch (InvalidConfiguationTransactionException e) {
+                //  Fatal Error: The error is bubbled up to the application 
layer. The application can decide what to do
+                closeAll();
+                throw InvalidConfiguationTransactionException;

Review Comment:
   Should it be `throw e;`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to