hachikuji commented on a change in pull request #9815:
URL: https://github.com/apache/kafka/pull/9815#discussion_r555286063
##########
File path: examples/src/main/java/kafka/examples/Consumer.java
##########
@@ -26,33 +25,34 @@
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-public class Consumer extends ShutdownableThread {
+public class Consumer implements Runnable {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final String groupId;
private final int numMessageToConsume;
private int messageRemaining;
- private final CountDownLatch latch;
public Consumer(final String topic,
final String groupId,
final Optional<String> instanceId,
final boolean readCommitted,
final int numMessageToConsume,
- final CountDownLatch latch) {
- super("KafkaConsumerExample", false);
+ final boolean transactional) {
this.groupId = groupId;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
instanceId.ifPresent(id ->
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
+ // if consuming as part of exactly-once processor, committing will be
done by the producer
Review comment:
The change makes sense to me. I don't think anything would stop the
auto-commits from going through. Even if there was such a mechanism, it seems
better to explicitly disable it.
##########
File path: examples/src/main/java/kafka/examples/Consumer.java
##########
@@ -62,34 +62,24 @@ public Consumer(final String topic,
this.topic = topic;
this.numMessageToConsume = numMessageToConsume;
this.messageRemaining = numMessageToConsume;
- this.latch = latch;
}
KafkaConsumer<Integer, String> get() {
return consumer;
}
@Override
- public void doWork() {
+ public void run() {
consumer.subscribe(Collections.singletonList(this.topic));
- ConsumerRecords<Integer, String> records =
consumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<Integer, String> record : records) {
- System.out.println(groupId + " received message : from partition "
+ record.partition() + ", (" + record.key() + ", " + record.value() + ") at
offset " + record.offset());
+ while (!Thread.currentThread().isInterrupted() && messageRemaining >
0) {
Review comment:
`doWork` is just one iteration. `ShutdownableThread` has the loop. I'm
ok with the change, but we probably will need to copy over some of the shutdown
logic.
##########
File path: examples/src/main/java/kafka/examples/Consumer.java
##########
@@ -26,33 +25,34 @@
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-public class Consumer extends ShutdownableThread {
+public class Consumer implements Runnable {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final String groupId;
private final int numMessageToConsume;
private int messageRemaining;
- private final CountDownLatch latch;
public Consumer(final String topic,
final String groupId,
final Optional<String> instanceId,
final boolean readCommitted,
final int numMessageToConsume,
- final CountDownLatch latch) {
- super("KafkaConsumerExample", false);
+ final boolean transactional) {
this.groupId = groupId;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
instanceId.ifPresent(id ->
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
+ // if consuming as part of exactly-once processor, committing will be
done by the producer
+ if (transactional) {
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ } else {
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
Review comment:
nit: since we are setting auto commit interval, perhaps we should set
enable auto commit explicitly rather than rely on the default
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]