jackjlli commented on code in PR #13742:
URL: https://github.com/apache/pinot/pull/13742#discussion_r1744379495
##########
pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java:
##########
@@ -50,21 +52,31 @@ public class AsyncQueryResponse implements QueryResponse {
private volatile ServerRoutingInstance _failedServer;
private volatile Exception _exception;
- public AsyncQueryResponse(QueryRouter queryRouter, long requestId,
Set<ServerRoutingInstance> serversQueried,
- long startTimeMs, long timeoutMs, ServerRoutingStatsManager
serverRoutingStatsManager) {
+ public AsyncQueryResponse(QueryRouter queryRouter, long requestId,
+// Set<ServerQueryRoutingContext> serverQueryRoutingContexts,
+ Map<ServerRoutingInstance, List<InstanceRequest>> requestMap, long
startTimeMs, long timeoutMs,
+ ServerRoutingStatsManager serverRoutingStatsManager) {
_queryRouter = queryRouter;
_requestId = requestId;
- int numServersQueried = serversQueried.size();
- _responseMap = new
ConcurrentHashMap<>(HashUtil.getHashMapCapacity(numServersQueried));
+ _responses = new ConcurrentHashMap<>();
_serverRoutingStatsManager = serverRoutingStatsManager;
- for (ServerRoutingInstance serverRoutingInstance : serversQueried) {
- // Record stats related to query submission just before sending the
request. Otherwise, if the response is
- // received immediately, there's a possibility of updating query
response stats before updating query
- // submission stats.
- _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId,
serverRoutingInstance.getInstanceId());
- _responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs));
+ int numQueriesIssued = 0;
+ for (Map.Entry<ServerRoutingInstance, List<InstanceRequest>>
serverRequests : requestMap.entrySet()) {
+ for (InstanceRequest request : serverRequests.getValue()) {
+ // Record stats related to query submission just before sending the
request. Otherwise, if the response is
+ // received immediately, there's a possibility of updating query
response stats before updating query
+ // submission stats.
+ _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId,
serverRequests.getKey().getInstanceId());
+
+ _responses.computeIfAbsent(serverRequests.getKey(), k -> new
ConcurrentHashMap<>())
+ // we use query hash so that the same hash ID can be passed back
from servers more easily than trying to
+ // instantiate a valid InstanceRequest obj and send its hash
+ .put(request.getQuery().getPinotQuery().hashCode(), new
ServerResponse(startTimeMs));
Review Comment:
I see your point, yes it should be okay to just use the physical table name
here, as each thread will just work on 1 request. 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]