This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b3b0b9ea8a Many places in the code, the entire response String is 
read, then converted into JSON. Changing that to directly populate JSON using 
the Stream to reduce the memory overhead. (#11465)
b3b0b9ea8a is described below

commit b3b0b9ea8ab99e66cb77afdd8eabc1bb03cb448b
Author: soumitra-st <[email protected]>
AuthorDate: Thu Aug 31 00:26:06 2023 -0700

    Many places in the code, the entire response String is read, then converted 
into JSON. Changing that to directly populate JSON using the Stream to reduce 
the memory overhead. (#11465)
---
 .../src/main/java/org/apache/pinot/client/BrokerCache.java   |  4 +---
 .../pinot/client/JsonAsyncHttpPinotClientTransport.java      |  7 ++-----
 .../client/controller/response/ControllerResponseFuture.java | 11 +++++------
 .../controller/response/ControllerTenantBrokerResponse.java  | 12 +++++-------
 .../pinot/client/controller/response/SchemaResponse.java     |  9 ++++-----
 .../pinot/client/controller/response/TableResponse.java      | 12 +++++-------
 .../src/main/java/org/apache/pinot/spi/utils/JsonUtils.java  |  5 +++++
 7 files changed, 27 insertions(+), 33 deletions(-)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
index 530f59958d..759dd32084 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.core.type.TypeReference;
 import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.JdkSslContext;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -136,8 +135,7 @@ public class BrokerCache {
 
     Future<Response> responseFuture = getRequest.addHeader("accept", 
"application/json").execute();
     Response response = responseFuture.get();
-    String responseBody = response.getResponseBody(StandardCharsets.UTF_8);
-    return JsonUtils.stringToObject(responseBody, RESPONSE_TYPE_REF);
+    return JsonUtils.inputStreamToObject(response.getResponseBodyAsStream(), 
RESPONSE_TYPE_REF);
   }
 
   private BrokerData getBrokerData(Map<String, List<BrokerInstance>> 
responses) {
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
index 2778dac075..e60d6ca023 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
@@ -18,14 +18,12 @@
  */
 package org.apache.pinot.client;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.JdkSslContext;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -127,10 +125,9 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport<C
                   "Pinot returned HTTP status " + httpResponse.getStatusCode() 
+ ", expected 200");
             }
 
-            String responseBody = 
httpResponse.getResponseBody(StandardCharsets.UTF_8);
             try {
-              return 
BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
-            } catch (JsonProcessingException e) {
+              return 
BrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+            } catch (IOException e) {
               throw new CompletionException(e);
             }
           });
diff --git 
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/ControllerResponseFuture.java
 
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/ControllerResponseFuture.java
index 1bc0470d75..9899eeb2d5 100644
--- 
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/ControllerResponseFuture.java
+++ 
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/ControllerResponseFuture.java
@@ -18,10 +18,11 @@
  */
 package org.apache.pinot.client.controller.response;
 
-import java.nio.charset.StandardCharsets;
+import java.io.InputStream;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.pinot.client.PinotClientException;
 import org.asynchttpclient.Response;
 import org.slf4j.Logger;
@@ -62,7 +63,7 @@ abstract class ControllerResponseFuture<T> implements 
Future<T> {
   abstract public T get(long timeout, TimeUnit unit)
       throws ExecutionException;
 
-  public String getStringResponse(long timeout, TimeUnit unit)
+  public InputStream getStreamResponse(long timeout, TimeUnit unit)
       throws ExecutionException {
     try {
       LOGGER.debug("Sending request to {}", _url);
@@ -75,10 +76,8 @@ abstract class ControllerResponseFuture<T> implements 
Future<T> {
         throw new PinotClientException("Pinot returned HTTP status " + 
httpResponse.getStatusCode() + ", expected 200");
       }
 
-      String responseBody = 
httpResponse.getResponseBody(StandardCharsets.UTF_8);
-
-      return responseBody;
-    } catch (Exception e) {
+      return httpResponse.getResponseBodyAsStream();
+    } catch (TimeoutException | InterruptedException e) {
       throw new ExecutionException(e);
     }
   }
diff --git 
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/ControllerTenantBrokerResponse.java
 
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/ControllerTenantBrokerResponse.java
index 68d212aade..1b537aad42 100644
--- 
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/ControllerTenantBrokerResponse.java
+++ 
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/ControllerTenantBrokerResponse.java
@@ -20,6 +20,7 @@ package org.apache.pinot.client.controller.response;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -74,16 +75,13 @@ public class ControllerTenantBrokerResponse {
     @Override
     public ControllerTenantBrokerResponse get(long timeout, TimeUnit unit)
         throws ExecutionException {
-      String response = getStringResponse(timeout, unit);
       try {
-        JsonNode jsonResponse = JsonUtils.stringToJsonNode(response);
-        ControllerTenantBrokerResponse tableResponse = 
ControllerTenantBrokerResponse.fromJson(jsonResponse);
-        return tableResponse;
+        InputStream response = getStreamResponse(timeout, unit);
+        JsonNode jsonResponse = JsonUtils.inputStreamToJsonNode(response);
+        return ControllerTenantBrokerResponse.fromJson(jsonResponse);
       } catch (IOException e) {
-        new ExecutionException(e);
+        throw new ExecutionException(e);
       }
-
-      return null;
     }
   }
 }
diff --git 
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/SchemaResponse.java
 
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/SchemaResponse.java
index 27116a634a..5cdbe49050 100644
--- 
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/SchemaResponse.java
+++ 
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/SchemaResponse.java
@@ -20,6 +20,7 @@ package org.apache.pinot.client.controller.response;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -75,15 +76,13 @@ public class SchemaResponse {
     @Override
     public SchemaResponse get(long timeout, TimeUnit unit)
         throws ExecutionException {
-      String response = getStringResponse(timeout, unit);
       try {
-        JsonNode jsonResponse = JsonUtils.stringToJsonNode(response);
+        InputStream response = getStreamResponse(timeout, unit);
+        JsonNode jsonResponse = JsonUtils.inputStreamToJsonNode(response);
         return SchemaResponse.fromJson(jsonResponse);
       } catch (IOException e) {
-        new ExecutionException(e);
+        throw new ExecutionException(e);
       }
-
-      return null;
     }
   }
 }
diff --git 
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/TableResponse.java
 
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/TableResponse.java
index 7e4a1c35fc..56ea973e9c 100644
--- 
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/TableResponse.java
+++ 
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/controller/response/TableResponse.java
@@ -20,6 +20,7 @@ package org.apache.pinot.client.controller.response;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -78,16 +79,13 @@ public class TableResponse {
     @Override
     public TableResponse get(long timeout, TimeUnit unit)
         throws ExecutionException {
-      String response = getStringResponse(timeout, unit);
       try {
-        JsonNode jsonResponse = JsonUtils.stringToJsonNode(response);
-        TableResponse tableResponse = TableResponse.fromJson(jsonResponse);
-        return tableResponse;
+        InputStream response = getStreamResponse(timeout, unit);
+        JsonNode jsonResponse = JsonUtils.inputStreamToJsonNode(response);
+        return TableResponse.fromJson(jsonResponse);
       } catch (IOException e) {
-        new ExecutionException(e);
+        throw new ExecutionException(e);
       }
-
-      return null;
     }
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index a6dbd0883e..9257bce208 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -196,6 +196,11 @@ public class JsonUtils {
     return DEFAULT_READER.forType(valueType).readValue(jsonInputStream);
   }
 
+  public static <T> T inputStreamToObject(InputStream jsonInputStream, 
TypeReference<T> valueTypeRef)
+      throws IOException {
+    return DEFAULT_READER.forType(valueTypeRef).readValue(jsonInputStream);
+  }
+
   public static JsonNode inputStreamToJsonNode(InputStream jsonInputStream)
       throws IOException {
     return DEFAULT_READER.readTree(jsonInputStream);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to