ege-st commented on code in PR #11710:
URL: https://github.com/apache/pinot/pull/11710#discussion_r1341787880
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -170,15 +171,18 @@ public BaseBrokerRequestHandler(PinotConfiguration
config, String brokerId, Brok
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryResponseLimit =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT,
Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
+ _queryMaxSerializedResponseBytes =
config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES,
Review Comment:
Can we add a gauge metric which will let us know what each Broker's
threshold is set to?
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -168,6 +169,19 @@ protected byte[]
processQueryAndSerialize(ServerQueryRequest queryRequest, Execu
byte[] responseBytes = serializeResponse(queryRequest, instanceResponse);
+ Map<String, String> queryOptions =
queryRequest.getQueryContext().getQueryOptions();
+ Long maxResponseByteLength =
QueryOptionsUtils.getMaxSerializedResponseLengthPerServer(queryOptions);
+ if (maxResponseByteLength != null && responseBytes.length >
maxResponseByteLength) {
+ String errorMessage = String.format("Serialized response exceeds
threshold for requestId %d from broker %s",
+ queryRequest.getRequestId(), queryRequest.getBrokerId());
+ LOGGER.error(errorMessage);
+ // TODO(Vivek): Add a metric indicating the number of such exceptions.
Review Comment:
Yes, having a metric to show how many messages are dropped b/c they exceed
size is critical for this feature.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -168,6 +169,19 @@ protected byte[]
processQueryAndSerialize(ServerQueryRequest queryRequest, Execu
byte[] responseBytes = serializeResponse(queryRequest, instanceResponse);
+ Map<String, String> queryOptions =
queryRequest.getQueryContext().getQueryOptions();
+ Long maxResponseByteLength =
QueryOptionsUtils.getMaxSerializedResponseLengthPerServer(queryOptions);
+ if (maxResponseByteLength != null && responseBytes.length >
maxResponseByteLength) {
+ String errorMessage = String.format("Serialized response exceeds
threshold for requestId %d from broker %s",
Review Comment:
This error message should include the threshold value to help with
investigation, debugging, and tuning.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -1654,6 +1669,24 @@ private long setQueryTimeout(String tableNameWithType,
Map<String, String> query
return remainingTimeMs;
}
+ /**
+ * Sets a query option indicating the maximum response size that can be sent
from a server to the broker. This size
+ * is measured for the serialized response.
+ */
+ private void setServerSerializedResponseLength(int numServers, @Nullable
BrokerRequest brokerRequest) {
+ if (brokerRequest == null) {
+ return;
+ }
+
+ Map<String, String> queryOptions =
brokerRequest.getPinotQuery().getQueryOptions();
+ Long maxLength =
QueryOptionsUtils.getMaxSerializedResponseLengthPerServer(queryOptions);
+ if (maxLength == null && _queryMaxSerializedResponseBytes > 0) {
+ long maxLengthPerServer = _queryMaxSerializedResponseBytes / numServers;
Review Comment:
What happens if `numServers` is `0`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]