asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r432587118



##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -272,28 +303,35 @@ public void testReplication() throws InterruptedException 
{
         waitForCondition(() -> {
             try {
                 return primaryClient.remoteConsumerOffsets("consumer-group-1", 
"backup",
-                    Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0));
+                    Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 1));
             } catch (Throwable e) {
                 return false;
             }
         }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary 
cluster.");
 
         Map<TopicPartition, OffsetAndMetadata> primaryOffsets = 
primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
         // Failback consumer group to primary cluster
-        Consumer<byte[], byte[]> consumer2 = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
"consumer-group-1"));
-        consumer2.assign(primaryOffsets.keySet());
+        Consumer<byte[], byte[]> consumer2 = 
primary.kafka().createConsumer(consumerProps);
+        List<TopicPartition> primaryPartitions = IntStream.range(0, 
NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), 
new TopicPartition("backup.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer2.assign(primaryPartitions);
         primaryOffsets.forEach(consumer2::seek);
-        consumer2.poll(Duration.ofMillis(500));
 
         assertTrue("Consumer failedback to zero upstream offset.", 
consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
         assertTrue("Consumer failedback to zero downstream offset.", 
consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedback beyond expected upstream offset.", 
consumer2.position(
-            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("test-topic-1", 0)) <= Math.ceil((float) 
NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)) + Math.ceil((float) 
NUM_RECORDS_PRODUCED / NUM_PARTITIONS));

Review comment:
       Yep. I need to cast to float to ensure that the result of the division 
is a float, with a non-zero decimal value, so that ceil will round it up to the 
nearest integer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to