This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a5739f076 Enhance PulsarConsumerTest (#12948)
6a5739f076 is described below

commit 6a5739f076107df16f303c1e0ea0e082a94f8004
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Apr 18 13:22:21 2024 -0700

    Enhance PulsarConsumerTest (#12948)
---
 .../plugin/stream/pulsar/PulsarConsumerTest.java   | 187 +++++++++------------
 1 file changed, 80 insertions(+), 107 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
index 01cd5cd26e..1baf212f17 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.stream.BytesStreamMessage;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
@@ -31,7 +32,7 @@ import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRouter;
@@ -39,7 +40,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TopicMetadata;
-import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.testcontainers.containers.PulsarContainer;
 import org.testcontainers.utility.DockerImageName;
 import org.testng.annotations.AfterClass;
@@ -49,6 +50,7 @@ import org.testng.annotations.Test;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 
 public class PulsarConsumerTest {
@@ -62,103 +64,85 @@ public class PulsarConsumerTest {
   public static final int NUM_PARTITIONS = 2;
   public static final int NUM_RECORDS_PER_PARTITION = 1000;
   public static final int BATCH_SIZE = 10;
-  public static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) 
TimeUnit.MINUTES.toMillis(1);
+  public static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) 
TimeUnit.SECONDS.toMillis(1);
 
   private final List<List<MessageId>> _partitionToMessageIdMapping = new 
ArrayList<>(NUM_PARTITIONS);
   private final List<List<MessageId>> _partitionToMessageIdMappingBatch = new 
ArrayList<>(NUM_PARTITIONS);
 
   private PulsarContainer _pulsar;
-  private PulsarClient _pulsarClient;
 
   @BeforeClass
   public void setUp()
       throws Exception {
     _pulsar = new 
PulsarContainer(PULSAR_IMAGE).withStartupTimeout(Duration.ofMinutes(5));
-    try {
-      _pulsar.start();
-      _pulsarClient = 
PulsarClient.builder().serviceUrl(_pulsar.getPulsarBrokerUrl()).build();
-
-      try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(_pulsar.getHttpServiceUrl()).build()) {
-        createTopics(admin);
-        publishRecords();
-        publishRecordsBatch();
-        waitForMessagesToPublish(admin, TEST_TOPIC);
-        waitForMessagesToPublish(admin, TEST_TOPIC_BATCH);
-      }
-    } catch (Exception e) {
-      _pulsar.stop();
-      throw new RuntimeException("Failed to setUp test environment", e);
+    _pulsar.start();
+    try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(_pulsar.getHttpServiceUrl()).build()) {
+      Topics topics = admin.topics();
+      topics.createPartitionedTopic(TEST_TOPIC, NUM_PARTITIONS);
+      topics.createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITIONS);
     }
-  }
-
-  private void createTopics(PulsarAdmin admin)
-      throws PulsarAdminException {
-    InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
-    inactiveTopicPolicies.setDeleteWhileInactive(false);
-    admin.namespaces().setInactiveTopicPolicies("public/default", 
inactiveTopicPolicies);
-
-    admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITIONS);
-    admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITIONS);
-  }
-
-  private void waitForMessagesToPublish(PulsarAdmin admin, String topicName)
-      throws Exception {
-    long endTimeMs = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
-    while (System.currentTimeMillis() < endTimeMs) {
-      if (admin.topics().getPartitionedStats(topicName, 
false).getMsgInCounter()
-          == NUM_RECORDS_PER_PARTITION * NUM_PARTITIONS) {
-        return;
-      }
-      Thread.sleep(1000);
+    try (PulsarClient client = 
PulsarClient.builder().serviceUrl(_pulsar.getPulsarBrokerUrl()).build()) {
+      publishRecords(client);
+      publishRecordsBatch(client);
     }
-    throw new RuntimeException("Failed to publish messages to topic: " + 
topicName);
   }
 
   @AfterClass
   public void tearDown()
       throws Exception {
-    _pulsarClient.close();
     _pulsar.stop();
   }
 
-  public void publishRecords()
+  public void publishRecords(PulsarClient client)
       throws Exception {
     for (int p = 0; p < NUM_PARTITIONS; p++) {
       List<MessageId> messageIds = new ArrayList<>(NUM_RECORDS_PER_PARTITION);
       _partitionToMessageIdMapping.add(messageIds);
       int partition = p;
-      try (Producer<String> producer = 
_pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC)
+      try (Producer<String> producer = 
client.newProducer(Schema.STRING).topic(TEST_TOPIC)
           .messageRouter(new MessageRouter() {
             @Override
             public int choosePartition(Message<?> msg, TopicMetadata metadata) 
{
               return partition;
             }
-          }).create()) {
+          }).enableBatching(false).create()) {
+        List<Future<MessageId>> futures = new 
ArrayList<>(NUM_RECORDS_PER_PARTITION);
         for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
-          messageIds.add(producer.send(MESSAGE_PREFIX + i));
+          futures.add(producer.sendAsync(MESSAGE_PREFIX + i));
         }
         producer.flush();
+        for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
+          MessageId messageId = futures.get(i).get();
+          assertFalse(messageId instanceof BatchMessageIdImpl);
+          messageIds.add(messageId);
+        }
       }
     }
   }
 
-  public void publishRecordsBatch()
+  public void publishRecordsBatch(PulsarClient client)
       throws Exception {
     for (int p = 0; p < NUM_PARTITIONS; p++) {
       List<MessageId> messageIds = new ArrayList<>(NUM_RECORDS_PER_PARTITION);
       _partitionToMessageIdMappingBatch.add(messageIds);
       int partition = p;
-      try (Producer<String> producer = 
_pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC_BATCH)
+      try (Producer<String> producer = 
client.newProducer(Schema.STRING).topic(TEST_TOPIC_BATCH)
           .messageRouter(new MessageRouter() {
             @Override
             public int choosePartition(Message<?> msg, TopicMetadata metadata) 
{
               return partition;
             }
           }).batchingMaxMessages(BATCH_SIZE).batchingMaxPublishDelay(1, 
TimeUnit.SECONDS).create()) {
+        List<Future<MessageId>> futures = new 
ArrayList<>(NUM_RECORDS_PER_PARTITION);
         for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
-          messageIds.add(producer.send(MESSAGE_PREFIX + i));
+          futures.add(producer.sendAsync(MESSAGE_PREFIX + i));
         }
         producer.flush();
+        for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
+          MessageId messageId = futures.get(i).get();
+          assertTrue(messageId instanceof BatchMessageIdImpl);
+          messageIds.add(messageId);
+        }
       }
     }
   }
@@ -179,92 +163,81 @@ public class PulsarConsumerTest {
   public void testPartitionLevelConsumer()
       throws Exception {
     StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC));
-    int numPartitions;
     try (PulsarStreamMetadataProvider metadataProvider = new 
PulsarStreamMetadataProvider(CLIENT_ID,
         getStreamConfig(TEST_TOPIC))) {
-      numPartitions = 
metadataProvider.fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS);
+      
assertEquals(metadataProvider.fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS),
 NUM_PARTITIONS);
     }
-
-    for (int partition = 0; partition < numPartitions; partition++) {
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      List<MessageId> messageIds = _partitionToMessageIdMapping.get(i);
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
-          new PartitionGroupConsumptionStatus(partition, 0, new 
MessageIdStreamOffset(MessageId.earliest), null,
-              "CONSUMING");
+          new PartitionGroupConsumptionStatus(i, 0, new 
MessageIdStreamOffset(MessageId.earliest), null, "CONSUMING");
       try (
           PulsarPartitionLevelConsumer consumer =
               (PulsarPartitionLevelConsumer) 
streamConsumerFactory.createPartitionGroupConsumer(
               CLIENT_ID, partitionGroupConsumptionStatus)) {
-        PulsarMessageBatch messageBatch =
-            consumer.fetchMessages(new 
MessageIdStreamOffset(MessageId.earliest), CONSUMER_FETCH_TIMEOUT_MILLIS);
-        assertEquals(messageBatch.getMessageCount(), 1000);
-        assertFalse(messageBatch.isEndOfPartitionGroup());
-        for (int i = 0; i < 1000; i++) {
-          verifyMessage(messageBatch.getStreamMessage(i), partition, i, false);
-        }
-
-        messageBatch =
-            consumer.fetchMessages(new 
MessageIdStreamOffset(_partitionToMessageIdMapping.get(partition).get(500)),
-                CONSUMER_FETCH_TIMEOUT_MILLIS);
-        assertEquals(messageBatch.getMessageCount(), 500);
-        assertFalse(messageBatch.isEndOfPartitionGroup());
-        for (int i = 0; i < 500; i++) {
-          verifyMessage(messageBatch.getStreamMessage(i), partition, 500 + i, 
false);
-        }
+        // Start from earliest
+        testConsumer(consumer, 0, messageIds);
+        // Start from middle
+        testConsumer(consumer, 500, messageIds);
       }
     }
   }
 
-  private void verifyMessage(BytesStreamMessage streamMessage, int partition, 
int index, boolean batch) {
-    assertEquals(new String(streamMessage.getValue()), MESSAGE_PREFIX + index);
-    StreamMessageMetadata messageMetadata = streamMessage.getMetadata();
-    assertNotNull(messageMetadata);
-    MessageIdStreamOffset offset = (MessageIdStreamOffset) 
messageMetadata.getOffset();
-    assertNotNull(offset);
-    MessageIdStreamOffset nextOffset = (MessageIdStreamOffset) 
messageMetadata.getNextOffset();
-    assertNotNull(nextOffset);
-    List<MessageId> messageIds =
-        batch ? _partitionToMessageIdMappingBatch.get(partition) : 
_partitionToMessageIdMapping.get(partition);
-    assertEquals(offset.getMessageId(), messageIds.get(index));
-    if (index < NUM_RECORDS_PER_PARTITION - 1) {
-      assertEquals(nextOffset.getMessageId(), messageIds.get(index + 1));
-    }
-  }
-
   @Test
   public void testPartitionLevelConsumerBatchMessages()
       throws Exception {
     StreamConsumerFactory streamConsumerFactory =
         
StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC_BATCH));
-    int numPartitions;
     try (PulsarStreamMetadataProvider metadataProvider = new 
PulsarStreamMetadataProvider(CLIENT_ID,
         getStreamConfig(TEST_TOPIC_BATCH))) {
-      numPartitions = 
metadataProvider.fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS);
+      
assertEquals(metadataProvider.fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS),
 NUM_PARTITIONS);
     }
-
-    for (int partition = 0; partition < numPartitions; partition++) {
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      List<MessageId> messageIds = _partitionToMessageIdMappingBatch.get(i);
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
-          new PartitionGroupConsumptionStatus(partition, 0, new 
MessageIdStreamOffset(MessageId.earliest), null,
-              "CONSUMING");
+          new PartitionGroupConsumptionStatus(i, 0, new 
MessageIdStreamOffset(MessageId.earliest), null, "CONSUMING");
       try (
           PulsarPartitionLevelConsumer consumer =
               (PulsarPartitionLevelConsumer) 
streamConsumerFactory.createPartitionGroupConsumer(
               CLIENT_ID, partitionGroupConsumptionStatus)) {
-        PulsarMessageBatch messageBatch =
-            consumer.fetchMessages(new 
MessageIdStreamOffset(MessageId.earliest), CONSUMER_FETCH_TIMEOUT_MILLIS);
-        assertEquals(messageBatch.getMessageCount(), 1000);
-        assertFalse(messageBatch.isEndOfPartitionGroup());
-        for (int i = 0; i < 1000; i++) {
-          verifyMessage(messageBatch.getStreamMessage(i), partition, i, true);
-        }
+        // Start from earliest
+        testConsumer(consumer, 0, messageIds);
+        // Start from middle
+        testConsumer(consumer, 500, messageIds);
+      }
+    }
+  }
 
-        messageBatch =
-            consumer.fetchMessages(new 
MessageIdStreamOffset(_partitionToMessageIdMappingBatch.get(partition).get(500)),
-                CONSUMER_FETCH_TIMEOUT_MILLIS);
-        assertEquals(messageBatch.getMessageCount(), 500);
-        assertFalse(messageBatch.isEndOfPartitionGroup());
-        for (int i = 0; i < 500; i++) {
-          verifyMessage(messageBatch.getStreamMessage(i), partition, 500 + i, 
true);
-        }
+  private void testConsumer(PulsarPartitionLevelConsumer consumer, int 
startIndex, List<MessageId> messageIds) {
+    MessageId startMessageId = startIndex == 0 ? MessageId.earliest : 
messageIds.get(startIndex);
+    int numMessagesFetched = startIndex;
+    while (numMessagesFetched < NUM_RECORDS_PER_PARTITION) {
+      PulsarMessageBatch messageBatch =
+          consumer.fetchMessages(new MessageIdStreamOffset(startMessageId), 
CONSUMER_FETCH_TIMEOUT_MILLIS);
+      int messageCount = messageBatch.getMessageCount();
+      assertFalse(messageBatch.isEndOfPartitionGroup());
+      for (int i = 0; i < messageCount; i++) {
+        verifyMessage(messageBatch.getStreamMessage(i), numMessagesFetched + 
i, messageIds);
+      }
+      numMessagesFetched += messageCount;
+      if (numMessagesFetched < NUM_RECORDS_PER_PARTITION) {
+        startMessageId = messageIds.get(numMessagesFetched);
       }
     }
+    assertEquals(numMessagesFetched, NUM_RECORDS_PER_PARTITION);
+  }
+
+  private void verifyMessage(BytesStreamMessage streamMessage, int index, 
List<MessageId> messageIds) {
+    assertEquals(new String(streamMessage.getValue()), MESSAGE_PREFIX + index);
+    StreamMessageMetadata messageMetadata = streamMessage.getMetadata();
+    assertNotNull(messageMetadata);
+    MessageIdStreamOffset offset = (MessageIdStreamOffset) 
messageMetadata.getOffset();
+    assertNotNull(offset);
+    MessageIdStreamOffset nextOffset = (MessageIdStreamOffset) 
messageMetadata.getNextOffset();
+    assertNotNull(nextOffset);
+    assertEquals(offset.getMessageId(), messageIds.get(index));
+    if (index < NUM_RECORDS_PER_PARTITION - 1) {
+      assertEquals(nextOffset.getMessageId(), messageIds.get(index + 1));
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to