This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e8333ab24 Flaky test fix. Query only 1 broker to test quota split 
(#13771)
6e8333ab24 is described below

commit 6e8333ab241abcae3ef8a555e3c221b72c0523ed
Author: Shounak kulkarni <[email protected]>
AuthorDate: Fri Sep 6 19:07:17 2024 +0530

    Flaky test fix. Query only 1 broker to test quota split (#13771)
---
 .../tests/QueryQuotaClusterIntegrationTest.java    | 80 +++++++++++++++++++---
 1 file changed, 70 insertions(+), 10 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index d1fb956f2c..dfd9d39727 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -18,13 +18,18 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
 import java.net.URI;
+import java.util.Iterator;
 import java.util.Properties;
 import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
 import 
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManagerTest;
+import org.apache.pinot.client.BrokerResponse;
 import org.apache.pinot.client.ConnectionFactory;
 import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
 import org.apache.pinot.client.PinotClientException;
+import org.apache.pinot.client.PinotClientTransport;
 import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.common.utils.http.HttpClient;
 import org.apache.pinot.spi.config.table.QuotaConfig;
@@ -46,6 +51,9 @@ import static org.testng.Assert.assertTrue;
  * tested as part of {@link HelixExternalViewBasedQueryQuotaManagerTest}
  */
 public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest {
+  private PinotClientTransport _pinotClientTransport;
+  private String _brokerHostPort;
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -56,6 +64,7 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     startController();
     startBrokers(1);
     startServers(1);
+    _brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0);
 
     // Create and upload the schema and table config
     Schema schema = createSchema();
@@ -65,9 +74,11 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
 
     Properties properties = new Properties();
     properties.put(FAIL_ON_EXCEPTIONS, "FALSE");
+    _pinotClientTransport = new JsonAsyncHttpPinotClientTransportFactory()
+        .withConnectionProperties(getPinotConnectionProperties())
+        .buildTransport();
     _pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl() 
+ "/" + getHelixClusterName(),
-        new 
JsonAsyncHttpPinotClientTransportFactory().withConnectionProperties(getPinotConnectionProperties())
-            .buildTransport());
+        _pinotClientTransport);
   }
 
   @AfterMethod
@@ -76,6 +87,8 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     addQueryQuotaToClusterConfig(null);
     addQueryQuotaToDatabaseConfig(null);
     addQueryQuotaToTableConfig(null);
+    _brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0);
+    verifyQuotaUpdate(0);
   }
 
   @Test
@@ -125,12 +138,13 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
       addQueryQuotaToTableConfig(10);
       // Add one more broker such that quota gets distributed equally among 
them
       brokerStarter = startOneBroker(2);
-      // to allow change propagation to QueryQuotaManager
-      Thread.sleep(1000);
-      testQueryRate(10);
+      _brokerHostPort = LOCAL_HOST + ":" + brokerStarter.getPort();
+      // query only one broker across the divided quota
+      testQueryRateOnBroker(5);
       // drop table level quota so that database quota comes into effect
       addQueryQuotaToTableConfig(null);
-      testQueryRate(25);
+      // query only one broker across the divided quota
+      testQueryRateOnBroker(12.5f);
     } finally {
       if (brokerStarter != null) {
         brokerStarter.stop();
@@ -143,19 +157,29 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
    * Then runs the query load with double the max rate and expects queries to 
fail due to quota breach.
    * @param maxRate max rate allowed by the quota
    */
-  void testQueryRate(int maxRate)
+  void testQueryRate(float maxRate)
       throws Exception {
+    verifyQuotaUpdate(maxRate);
     runQueries(maxRate, false);
     //increase the qps and some of the queries should be throttled.
     runQueries(maxRate * 2, true);
   }
 
+  void testQueryRateOnBroker(float maxRate)
+      throws Exception {
+    verifyQuotaUpdate(maxRate);
+    runQueriesOnBroker(maxRate, false);
+    //increase the qps and some of the queries should be throttled.
+    runQueriesOnBroker(maxRate * 2, true);
+  }
+
   // try to keep the qps below 50 to ensure that the time lost between 2 query 
runs on top of the sleepMillis
   // is not comparable to sleepMillis, else the actual qps would end up being 
much lower than required qps
   private void runQueries(double qps, boolean shouldFail)
       throws Exception {
     int failCount = 0;
     long sleepMillis = (long) (1000 / qps);
+    Thread.sleep(sleepMillis);
     for (int i = 0; i < qps * 2; i++) {
       ResultSetGroup resultSetGroup = _pinotConnection.execute("SELECT 
COUNT(*) FROM " + getTableName());
       for (PinotClientException exception : resultSetGroup.getExceptions()) {
@@ -169,6 +193,45 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && 
shouldFail));
   }
 
+  private void verifyQuotaUpdate(float quotaQps) {
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        float tableQuota = 
Float.parseFloat(sendGetRequest(String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE";,
+            _brokerHostPort, getTableName())));
+        tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota;
+        float dbQuota = 
Float.parseFloat(sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default";,
+            _brokerHostPort)));
+        dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota;
+        return quotaQps == Math.min(tableQuota, dbQuota)
+            || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota == 
Long.MAX_VALUE);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 5000, "Failed to reflect query quota on rate limiter in 5s");
+  }
+
+  private BrokerResponse executeQueryOnBroker(String query) {
+    return _pinotClientTransport.executeQuery(_brokerHostPort, query);
+  }
+
+  private void runQueriesOnBroker(double qps, boolean shouldFail)
+      throws Exception {
+    int failCount = 0;
+    long sleepMillis = (long) (1000 / qps);
+    Thread.sleep(sleepMillis);
+    for (int i = 0; i < qps * 2; i++) {
+      BrokerResponse resultSetGroup = executeQueryOnBroker("SELECT COUNT(*) 
FROM " + getTableName());
+      for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements(); 
it.hasNext(); ) {
+        JsonNode exception = it.next();
+        if (exception.toPrettyString().contains("QuotaExceededError")) {
+          failCount++;
+          break;
+        }
+      }
+      Thread.sleep(sleepMillis);
+    }
+    assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && 
shouldFail));
+  }
 
   public void addQueryQuotaToTableConfig(Integer maxQps)
       throws Exception {
@@ -176,7 +239,6 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     tableConfig.setQuotaConfig(new QuotaConfig(null, maxQps == null ? null : 
maxQps.toString()));
     updateTableConfig(tableConfig);
     // to allow change propagation to QueryQuotaManager
-    Thread.sleep(1000);
   }
 
   public void addQueryQuotaToDatabaseConfig(Integer maxQps)
@@ -187,7 +249,6 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     }
     HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new 
URI(url), null, null));
     // to allow change propagation to QueryQuotaManager
-    Thread.sleep(1000);
   }
 
   public void addQueryQuotaToClusterConfig(Integer maxQps)
@@ -202,6 +263,5 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
           _httpClient.sendJsonPostRequest(new 
URI(_controllerRequestURLBuilder.forClusterConfigs()), payload));
     }
     // to allow change propagation to QueryQuotaManager
-    Thread.sleep(1000);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to