mimaison commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r462989546
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void
waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
}, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not
complete in time");
}
+ private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer)
throws InterruptedException {
+ final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+ waitForCondition(() -> {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+ consumer.commitSync();
+ return NUM_RECORDS_PRODUCED ==
totalConsumedRecords.addAndGet(records.count());
+ }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the
records in time");
+ }
+
@Test
public void testOneWayReplicationWithAutoOffsetSync() throws
InterruptedException {
// create consumers before starting the connectors so we don't need to
wait for discovery
- Consumer<byte[], byte[]> consumer1 =
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", "consumer-group-1"), "test-topic-1");
- consumer1.poll(Duration.ofMillis(500));
- consumer1.commitSync();
- consumer1.close();
+ try (Consumer<byte[], byte[]> consumer1 =
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", "consumer-group-1"), "test-topic-1")) {
+ // we need to wait for consuming all the records for MM2
replicaing the expected offsets
Review comment:
`replicaing` -> `replicating`
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void
waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
}, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not
complete in time");
}
+ private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer)
throws InterruptedException {
+ final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+ waitForCondition(() -> {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
Review comment:
Can we add the types `<byte[], byte[]>` to `ConsumerRecords`?
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -387,11 +393,11 @@ public void testOneWayReplicationWithAutoOffsetSync()
throws InterruptedExceptio
}
// create a consumer at primary cluster to consume the new topic
- consumer1 =
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", "consumer-group-1"), "test-topic-2");
- consumer1.poll(Duration.ofMillis(500));
- consumer1.commitSync();
- consumer1.close();
+ try (Consumer<byte[], byte[]> consumer1 =
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", "consumer-group-1"), "test-topic-2")) {
+ // we need to wait for consuming all the records for MM2
replicaing the expected offsets
Review comment:
`replicaing` -> `replicating`
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void
waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
}, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not
complete in time");
}
+ private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer)
throws InterruptedException {
+ final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+ waitForCondition(() -> {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+ consumer.commitSync();
Review comment:
We can move that line after the `waitForCondition()` block to just
commit once all records have been consumed.
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void
waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
}, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not
complete in time");
}
+ private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer)
throws InterruptedException {
+ final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+ waitForCondition(() -> {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+ consumer.commitSync();
+ return NUM_RECORDS_PRODUCED ==
totalConsumedRecords.addAndGet(records.count());
+ }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the
records in time");
Review comment:
nit: The sentence sounds slightly better if you remove `the`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]