This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 43eb011c12 In DispatchablePlanMetadata, store worker id to server
instance map (#11256)
43eb011c12 is described below
commit 43eb011c125a0d35bf1204fb8794293f66633870
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Aug 3 12:05:36 2023 -0700
In DispatchablePlanMetadata, store worker id to server instance map (#11256)
---
.../MultiStageBrokerRequestHandler.java | 6 +-
.../pinot/query/planner/DispatchableSubPlan.java | 8 +-
.../planner/physical/DispatchablePlanContext.java | 57 +++++-----
.../planner/physical/DispatchablePlanMetadata.java | 63 +++--------
.../planner/physical/MailboxAssignmentVisitor.java | 125 ++++++++++-----------
.../planner/physical/PinotDispatchPlanner.java | 18 +--
.../colocated/GreedyShuffleRewriteVisitor.java | 43 ++++---
.../apache/pinot/query/routing/WorkerManager.java | 55 ++++-----
.../apache/pinot/query/QueryCompilationTest.java | 58 +++++-----
9 files changed, 192 insertions(+), 241 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 8bbf955015..f1fb0d5b94 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -20,7 +20,6 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -245,10 +244,9 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
brokerResponse.setResultTable(queryResults);
dispatchableSubPlan.getTableToUnavailableSegmentsMap().forEach(
- (table, segmentList) -> brokerResponse.addToExceptions(
+ (tableName, unavailableSegments) -> brokerResponse.addToExceptions(
new
QueryProcessingException(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE,
- String.format("Some segments are unavailable for table %s,
unavailable segments: [%s]", table,
- Arrays.toString(segmentList.toArray())))));
+ String.format("Find unavailable segments: %s for table: %s",
unavailableSegments, tableName))));
for (Map.Entry<Integer, ExecutionStatsAggregator> entry :
stageIdStatsMap.entrySet()) {
if (entry.getKey() == 0) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
index 4f99820351..748c3ac362 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.planner;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -42,11 +41,10 @@ public class DispatchableSubPlan {
private final List<Pair<Integer, String>> _queryResultFields;
private final List<DispatchablePlanFragment> _queryStageList;
private final Set<String> _tableNames;
-
- private final Map<String, Collection<String>> _tableToUnavailableSegmentsMap;
+ private final Map<String, Set<String>> _tableToUnavailableSegmentsMap;
public DispatchableSubPlan(List<Pair<Integer, String>> fields,
List<DispatchablePlanFragment> queryStageList,
- Set<String> tableNames, Map<String, Collection<String>>
tableToUnavailableSegmentsMap) {
+ Set<String> tableNames, Map<String, Set<String>>
tableToUnavailableSegmentsMap) {
_queryResultFields = fields;
_queryStageList = queryStageList;
_tableNames = tableNames;
@@ -81,7 +79,7 @@ public class DispatchableSubPlan {
* Get the table to unavailable segments map
* @return table to unavailable segments map
*/
- public Map<String, Collection<String>> getTableToUnavailableSegmentsMap() {
+ public Map<String, Set<String>> getTableToUnavailableSegmentsMap() {
return _tableToUnavailableSegmentsMap;
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index c0fb98d9a5..4699014ad0 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.planner.physical;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerManager;
@@ -90,44 +92,49 @@ public class DispatchablePlanContext {
DispatchablePlanFragment[] dispatchablePlanFragmentArray =
new DispatchablePlanFragment[_dispatchablePlanStageRootMap.size()];
createDispatchablePlanFragmentList(dispatchablePlanFragmentArray,
subPlanRoot);
- List<DispatchablePlanFragment> dispatchablePlanFragmentList =
Arrays.asList(dispatchablePlanFragmentArray);
- for (Map.Entry<Integer, DispatchablePlanMetadata> dispatchableEntry :
_dispatchablePlanMetadataMap.entrySet()) {
- DispatchablePlanMetadata dispatchablePlanMetadata =
dispatchableEntry.getValue();
+ for (Map.Entry<Integer, DispatchablePlanMetadata> planMetadataEntry :
_dispatchablePlanMetadataMap.entrySet()) {
+ int stageId = planMetadataEntry.getKey();
+ DispatchablePlanMetadata dispatchablePlanMetadata =
planMetadataEntry.getValue();
// construct each worker metadata
- WorkerMetadata[] workerMetadataList = new
WorkerMetadata[dispatchablePlanMetadata.getTotalWorkerCount()];
- for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry
- :
dispatchablePlanMetadata.getServerInstanceToWorkerIdMap().entrySet()) {
- for (int workerId : queryServerEntry.getValue()) {
- VirtualServerAddress virtualServerAddress = new
VirtualServerAddress(queryServerEntry.getKey(), workerId);
- WorkerMetadata.Builder builder = new WorkerMetadata.Builder();
- builder.setVirtualServerAddress(virtualServerAddress);
- if (dispatchablePlanMetadata.getScannedTables().size() == 1) {
-
builder.addTableSegmentsMap(dispatchablePlanMetadata.getWorkerIdToSegmentsMap().get(workerId));
- }
-
builder.putAllMailBoxInfosMap(dispatchablePlanMetadata.getWorkerIdToMailBoxIdsMap().get(workerId));
- workerMetadataList[workerId] = builder.build();
+ Map<Integer, QueryServerInstance> workerIdToServerInstanceMap =
+ dispatchablePlanMetadata.getWorkerIdToServerInstanceMap();
+ Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap =
+ dispatchablePlanMetadata.getWorkerIdToSegmentsMap();
+ Map<Integer, Map<Integer, MailboxMetadata>> workerIdToMailboxesMap =
+ dispatchablePlanMetadata.getWorkerIdToMailboxesMap();
+ Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdsMap =
new HashMap<>();
+ WorkerMetadata[] workerMetadataArray = new
WorkerMetadata[workerIdToServerInstanceMap.size()];
+ for (Map.Entry<Integer, QueryServerInstance> serverEntry :
workerIdToServerInstanceMap.entrySet()) {
+ int workerId = serverEntry.getKey();
+ QueryServerInstance queryServerInstance = serverEntry.getValue();
+ serverInstanceToWorkerIdsMap.computeIfAbsent(queryServerInstance, k ->
new ArrayList<>()).add(workerId);
+ WorkerMetadata.Builder workerMetadataBuilder = new
WorkerMetadata.Builder().setVirtualServerAddress(
+ new VirtualServerAddress(queryServerInstance, workerId));
+ if (workerIdToSegmentsMap != null) {
+
workerMetadataBuilder.addTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
}
+
workerMetadataBuilder.putAllMailBoxInfosMap(workerIdToMailboxesMap.get(workerId));
+ workerMetadataArray[workerId] = workerMetadataBuilder.build();
}
// set the stageMetadata
- int stageId = dispatchableEntry.getKey();
-
dispatchablePlanFragmentList.get(stageId).setWorkerMetadataList(Arrays.asList(workerMetadataList));
- dispatchablePlanFragmentList.get(stageId)
-
.setWorkerIdToSegmentsMap(dispatchablePlanMetadata.getWorkerIdToSegmentsMap());
- dispatchablePlanFragmentList.get(stageId)
-
.setServerInstanceToWorkerIdMap(dispatchablePlanMetadata.getServerInstanceToWorkerIdMap());
+ DispatchablePlanFragment dispatchablePlanFragment =
dispatchablePlanFragmentArray[stageId];
+
dispatchablePlanFragment.setWorkerMetadataList(Arrays.asList(workerMetadataArray));
+ if (workerIdToSegmentsMap != null) {
+
dispatchablePlanFragment.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
+ }
+
dispatchablePlanFragment.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdsMap);
Preconditions.checkState(dispatchablePlanMetadata.getScannedTables().size() <=
1,
"More than one table is not supported yet");
if (dispatchablePlanMetadata.getScannedTables().size() == 1) {
-
dispatchablePlanFragmentList.get(stageId).setTableName(dispatchablePlanMetadata.getScannedTables().get(0));
+
dispatchablePlanFragment.setTableName(dispatchablePlanMetadata.getScannedTables().get(0));
}
if (dispatchablePlanMetadata.getTimeBoundaryInfo() != null) {
- dispatchablePlanFragmentList.get(stageId)
-
.setTimeBoundaryInfo(dispatchablePlanMetadata.getTimeBoundaryInfo());
+
dispatchablePlanFragment.setTimeBoundaryInfo(dispatchablePlanMetadata.getTimeBoundaryInfo());
}
}
- return dispatchablePlanFragmentList;
+ return Arrays.asList(dispatchablePlanFragmentArray);
}
private void createDispatchablePlanFragmentList(DispatchablePlanFragment[]
dispatchablePlanFragmentArray,
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index f53cc008f8..abe4f64a46 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.routing.MailboxMetadata;
@@ -47,7 +48,7 @@ public class DispatchablePlanMetadata implements Serializable
{
private Map<String, String> _tableOptions;
// used for assigning server/worker nodes.
- private Map<QueryServerInstance, List<Integer>> _serverInstanceToWorkerIdMap;
+ private Map<Integer, QueryServerInstance> _workerIdToServerInstanceMap;
// used for table scan stage - we use ServerInstance instead of VirtualServer
// here because all virtual servers that share a server instance will have
the
@@ -59,7 +60,7 @@ public class DispatchablePlanMetadata implements Serializable
{
private final Map<Integer, Map<Integer, MailboxMetadata>>
_workerIdToMailboxesMap;
// used for tracking unavailable segments from routing table, then assemble
missing segments exception.
- private final Map<String, Collection<String>> _tableToUnavailableSegmentsMap;
+ private final Map<String, Set<String>> _tableToUnavailableSegmentsMap;
// time boundary info
private TimeBoundaryInfo _timeBoundaryInfo;
@@ -70,13 +71,8 @@ public class DispatchablePlanMetadata implements
Serializable {
// whether a stage is partitioned table scan
private boolean _isPartitionedTableScan;
- // Total worker count of this stage.
- private int _totalWorkerCount;
-
public DispatchablePlanMetadata() {
_scannedTables = new ArrayList<>();
- _serverInstanceToWorkerIdMap = new HashMap<>();
- _workerIdToSegmentsMap = new HashMap<>();
_workerIdToMailboxesMap = new HashMap<>();
_tableToUnavailableSegmentsMap = new HashMap<>();
}
@@ -102,6 +98,15 @@ public class DispatchablePlanMetadata implements
Serializable {
// attached physical plan context.
// -----------------------------------------------
+ public Map<Integer, QueryServerInstance> getWorkerIdToServerInstanceMap() {
+ return _workerIdToServerInstanceMap;
+ }
+
+ public void setWorkerIdToServerInstanceMap(Map<Integer, QueryServerInstance>
workerIdToServerInstanceMap) {
+ _workerIdToServerInstanceMap = workerIdToServerInstanceMap;
+ }
+
+ @Nullable
public Map<Integer, Map<String, List<String>>> getWorkerIdToSegmentsMap() {
return _workerIdToSegmentsMap;
}
@@ -110,27 +115,10 @@ public class DispatchablePlanMetadata implements
Serializable {
_workerIdToSegmentsMap = workerIdToSegmentsMap;
}
- public Map<Integer, Map<Integer, MailboxMetadata>>
getWorkerIdToMailBoxIdsMap() {
+ public Map<Integer, Map<Integer, MailboxMetadata>>
getWorkerIdToMailboxesMap() {
return _workerIdToMailboxesMap;
}
- public void setWorkerIdToMailBoxIdsMap(Map<Integer, Map<Integer,
MailboxMetadata>> workerIdToMailboxesMap) {
- _workerIdToMailboxesMap.putAll(workerIdToMailboxesMap);
- }
-
- public void addWorkerIdToMailBoxIdsMap(int planFragmentId,
- Map<Integer, MailboxMetadata> planFragmentIdToMailboxesMap) {
- _workerIdToMailboxesMap.put(planFragmentId, planFragmentIdToMailboxesMap);
- }
-
- public Map<QueryServerInstance, List<Integer>>
getServerInstanceToWorkerIdMap() {
- return _serverInstanceToWorkerIdMap;
- }
-
- public void setServerInstanceToWorkerIdMap(Map<QueryServerInstance,
List<Integer>> serverInstances) {
- _serverInstanceToWorkerIdMap = serverInstances;
- }
-
public TimeBoundaryInfo getTimeBoundaryInfo() {
return _timeBoundaryInfo;
}
@@ -155,30 +143,11 @@ public class DispatchablePlanMetadata implements
Serializable {
_isPartitionedTableScan = isPartitionedTableScan;
}
- public int getTotalWorkerCount() {
- return _totalWorkerCount;
- }
-
- public void setTotalWorkerCount(int totalWorkerCount) {
- _totalWorkerCount = totalWorkerCount;
- }
-
- public void addTableToUnavailableSegmentsMap(String table,
Collection<String> unavailableSegments) {
- if (!_tableToUnavailableSegmentsMap.containsKey(table)) {
- _tableToUnavailableSegmentsMap.put(table, new HashSet<>());
- }
- _tableToUnavailableSegmentsMap.get(table).addAll(unavailableSegments);
- }
-
- public Map<String, Collection<String>> getTableToUnavailableSegmentsMap() {
+ public Map<String, Set<String>> getTableToUnavailableSegmentsMap() {
return _tableToUnavailableSegmentsMap;
}
- @Override
- public String toString() {
- return "DispatchablePlanMetadata{" + "_scannedTables=" + _scannedTables +
", _serverInstanceToWorkerIdMap="
- + _serverInstanceToWorkerIdMap + ", _workerIdToSegmentsMap=" +
_workerIdToSegmentsMap
- + ", _workerIdToMailboxesMap=" + _workerIdToMailboxesMap + ",
_tableToUnavailableSegmentsMap="
- + _tableToUnavailableSegmentsMap + ", _timeBoundaryInfo=" +
_timeBoundaryInfo + '}';
+ public void addUnavailableSegments(String tableName, Collection<String>
unavailableSegments) {
+ _tableToUnavailableSegmentsMap.computeIfAbsent(tableName, k -> new
HashSet<>()).addAll(unavailableSegments);
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index bc6cb7c83e..a6d758335d 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -18,9 +18,9 @@
*/
package org.apache.pinot.query.planner.physical;
+import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import
org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
@@ -43,79 +43,76 @@ public class MailboxAssignmentVisitor extends
DefaultPostOrderTraversalVisitor<V
Map<Integer, DispatchablePlanMetadata> metadataMap =
context.getDispatchablePlanMetadataMap();
DispatchablePlanMetadata senderMetadata =
metadataMap.get(senderFragmentId);
DispatchablePlanMetadata receiverMetadata =
metadataMap.get(receiverFragmentId);
- Map<QueryServerInstance, List<Integer>> senderWorkerIdsMap =
senderMetadata.getServerInstanceToWorkerIdMap();
- Map<QueryServerInstance, List<Integer>> receiverWorkerIdsMap =
receiverMetadata.getServerInstanceToWorkerIdMap();
- Map<Integer, Map<Integer, MailboxMetadata>> senderMailboxesMap =
senderMetadata.getWorkerIdToMailBoxIdsMap();
- Map<Integer, Map<Integer, MailboxMetadata>> receiverMailboxesMap =
receiverMetadata.getWorkerIdToMailBoxIdsMap();
+ Map<Integer, QueryServerInstance> senderServerMap =
senderMetadata.getWorkerIdToServerInstanceMap();
+ Map<Integer, QueryServerInstance> receiverServerMap =
receiverMetadata.getWorkerIdToServerInstanceMap();
+ Map<Integer, Map<Integer, MailboxMetadata>> senderMailboxesMap =
senderMetadata.getWorkerIdToMailboxesMap();
+ Map<Integer, Map<Integer, MailboxMetadata>> receiverMailboxesMap =
receiverMetadata.getWorkerIdToMailboxesMap();
+ int numSenders = senderServerMap.size();
+ int numReceivers = receiverServerMap.size();
if (sendNode.getDistributionType() == RelDistribution.Type.SINGLETON) {
// For SINGLETON exchange type, send the data to the same instance
(same worker id)
- senderWorkerIdsMap.forEach((serverInstance, workerIds) -> {
- for (int workerId : workerIds) {
- MailboxMetadata mailboxMetadata = new
MailboxMetadata(Collections.singletonList(
- MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId,
receiverFragmentId, workerId)),
- Collections.singletonList(new
VirtualServerAddress(serverInstance, workerId)), Collections.emptyMap());
- senderMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>()).put(receiverFragmentId, mailboxMetadata);
- receiverMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>()).put(senderFragmentId, mailboxMetadata);
- }
- });
+ Preconditions.checkState(numSenders == numReceivers,
+ "Got different number of workers for SINGLETON distribution type,
sender: %s, receiver: %s", numSenders,
+ numReceivers);
+ for (int workerId = 0; workerId < numSenders; workerId++) {
+ QueryServerInstance senderServer = senderServerMap.get(workerId);
+ QueryServerInstance receiverServer = receiverServerMap.get(workerId);
+ Preconditions.checkState(senderServer.equals(receiverServer),
+ "Got different server for SINGLETON distribution type for worker
id: %s, sender: %s, receiver: %s",
+ workerId, senderServer, receiverServer);
+ MailboxMetadata mailboxMetadata = new
MailboxMetadata(Collections.singletonList(
+ MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId,
receiverFragmentId, workerId)),
+ Collections.singletonList(new VirtualServerAddress(senderServer,
workerId)), Collections.emptyMap());
+ senderMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>()).put(receiverFragmentId, mailboxMetadata);
+ receiverMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>()).put(senderFragmentId, mailboxMetadata);
+ }
} else if (senderMetadata.isPartitionedTableScan()) {
// For partitioned table scan, send the data to the worker with the
same worker id (not necessary the same
// instance)
// TODO: Support further split the single partition into multiple
workers
- senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
- for (int workerId : senderWorkerIds) {
- receiverWorkerIdsMap.forEach((receiverServerInstance,
receiverWorkerIds) -> {
- for (int receiverWorkerId : receiverWorkerIds) {
- if (receiverWorkerId == workerId) {
- String mailboxId =
- MailboxIdUtils.toPlanMailboxId(senderFragmentId,
workerId, receiverFragmentId, workerId);
- MailboxMetadata serderMailboxMetadata = new
MailboxMetadata(Collections.singletonList(mailboxId),
- Collections.singletonList(new
VirtualServerAddress(receiverServerInstance, workerId)),
- Collections.emptyMap());
- MailboxMetadata receiverMailboxMetadata = new
MailboxMetadata(Collections.singletonList(mailboxId),
- Collections.singletonList(new
VirtualServerAddress(senderServerInstance, workerId)),
- Collections.emptyMap());
- senderMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>())
- .put(receiverFragmentId, serderMailboxMetadata);
- receiverMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>())
- .put(senderFragmentId, receiverMailboxMetadata);
- break;
- }
- }
- });
- }
- });
+ Preconditions.checkState(numSenders == numReceivers,
+ "Got different number of workers for partitioned table scan,
sender: %s, receiver: %s", numSenders,
+ numReceivers);
+ for (int workerId = 0; workerId < numSenders; workerId++) {
+ String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId,
workerId, receiverFragmentId, workerId);
+ MailboxMetadata serderMailboxMetadata = new
MailboxMetadata(Collections.singletonList(mailboxId),
+ Collections.singletonList(new
VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
+ Collections.emptyMap());
+ MailboxMetadata receiverMailboxMetadata = new
MailboxMetadata(Collections.singletonList(mailboxId),
+ Collections.singletonList(new
VirtualServerAddress(senderServerMap.get(workerId), workerId)),
+ Collections.emptyMap());
+ senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
+ .put(receiverFragmentId, serderMailboxMetadata);
+ receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
+ .put(senderFragmentId, receiverMailboxMetadata);
+ }
} else {
// For other exchange types, send the data to all the instances in the
receiver fragment
- // TODO:
- // 1. Add support for more exchange types
- // 2. Keep the receiver worker id sequential in the
senderMailboxMetadata so that the partitionId aligns with
- // the workerId. It is useful for JOIN query when only left table
is partitioned.
- senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
- for (int senderWorkerId : senderWorkerIds) {
- Map<Integer, MailboxMetadata> senderMailboxMetadataMap =
- senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new
HashMap<>());
- receiverWorkerIdsMap.forEach((receiverServerInstance,
receiverWorkerIds) -> {
- for (int receiverWorkerId : receiverWorkerIds) {
- Map<Integer, MailboxMetadata> receiverMailboxMetadataMap =
- receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k
-> new HashMap<>());
- String mailboxId =
MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId,
receiverFragmentId,
- receiverWorkerId);
- MailboxMetadata senderMailboxMetadata =
-
senderMailboxMetadataMap.computeIfAbsent(receiverFragmentId, k -> new
MailboxMetadata());
- senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
- senderMailboxMetadata.getVirtualAddressList()
- .add(new VirtualServerAddress(receiverServerInstance,
receiverWorkerId));
- MailboxMetadata receiverMailboxMetadata =
-
receiverMailboxMetadataMap.computeIfAbsent(senderFragmentId, k -> new
MailboxMetadata());
- receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
- receiverMailboxMetadata.getVirtualAddressList()
- .add(new VirtualServerAddress(senderServerInstance,
senderWorkerId));
- }
- });
+ // NOTE: Keep the receiver worker id sequential in the
senderMailboxMetadata so that the partitionId aligns with
+ // the workerId. It is useful for JOIN query when only left
table is partitioned.
+ // TODO: Add support for more exchange types
+ for (int senderWorkerId = 0; senderWorkerId < numSenders;
senderWorkerId++) {
+ VirtualServerAddress senderAddress =
+ new VirtualServerAddress(senderServerMap.get(senderWorkerId),
senderWorkerId);
+ MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
+ senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new
HashMap<>())
+ .put(receiverFragmentId, senderMailboxMetadata);
+ for (int receiverWorkerId = 0; receiverWorkerId < numReceivers;
receiverWorkerId++) {
+ VirtualServerAddress receiverAddress =
+ new
VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
+ String mailboxId =
+ MailboxIdUtils.toPlanMailboxId(senderFragmentId,
senderWorkerId, receiverFragmentId, receiverWorkerId);
+ senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
+ senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
+
+ MailboxMetadata receiverMailboxMetadata =
+ receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k ->
new HashMap<>())
+ .computeIfAbsent(senderFragmentId, k -> new
MailboxMetadata());
+ receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
+ receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
}
- });
+ }
}
}
return null;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
index 35c0a99ef4..d1618f2fc0 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
@@ -18,10 +18,10 @@
*/
package org.apache.pinot.query.planner.physical;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.DispatchableSubPlan;
@@ -92,18 +92,12 @@ public class PinotDispatchPlanner {
populateTableUnavailableSegments(dispatchablePlanContext.getDispatchablePlanMetadataMap()));
}
- private static Map<String, Collection<String>>
populateTableUnavailableSegments(
+ private static Map<String, Set<String>> populateTableUnavailableSegments(
Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) {
- Map<String, Collection<String>> tableToUnavailableSegments = new
HashMap<>();
- dispatchablePlanMetadataMap.values()
- .forEach(dispatchablePlanMetadata ->
dispatchablePlanMetadata.getTableToUnavailableSegmentsMap().forEach(
- (table, segments) -> {
- if (!tableToUnavailableSegments.containsKey(table)) {
- tableToUnavailableSegments.put(table, new HashSet<>());
- }
- tableToUnavailableSegments.get(table).addAll(segments);
- }
- ));
+ Map<String, Set<String>> tableToUnavailableSegments = new HashMap<>();
+ dispatchablePlanMetadataMap.values().forEach(metadata ->
metadata.getTableToUnavailableSegmentsMap().forEach(
+ (tableName, unavailableSegments) ->
tableToUnavailableSegments.computeIfAbsent(tableName, k -> new HashSet<>())
+ .addAll(unavailableSegments)));
return tableToUnavailableSegments;
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index 50f78e64bf..287db51daa 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -125,10 +125,8 @@ public class GreedyShuffleRewriteVisitor implements
PlanNodeVisitor<Set<Colocati
boolean canColocate = canJoinBeColocated(node);
// Step-2: Only if the servers assigned to both left and right nodes are
equal and the servers assigned to the join
// stage are a superset of those servers, can we skip shuffles.
- canColocate =
- canColocate &&
canServerAssignmentAllowShuffleSkip(node.getPlanFragmentId(),
- innerLeafNodes.get(0).getSenderStageId(),
- innerLeafNodes.get(1).getSenderStageId());
+ canColocate = canColocate &&
canServerAssignmentAllowShuffleSkip(node.getPlanFragmentId(),
+ innerLeafNodes.get(0).getSenderStageId(),
innerLeafNodes.get(1).getSenderStageId());
// Step-3: For both left/right MailboxReceiveNode/MailboxSendNode pairs,
check whether the key partitioning can
// allow shuffle skip.
canColocate = canColocate &&
partitionKeyConditionForJoin(innerLeafNodes.get(0),
@@ -140,8 +138,8 @@ public class GreedyShuffleRewriteVisitor implements
PlanNodeVisitor<Set<Colocati
canColocate = canColocate && checkPartitionScheme(innerLeafNodes.get(0),
innerLeafNodes.get(1), context);
if (canColocate) {
// If shuffle can be skipped, reassign servers.
-
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap(
-
_dispatchablePlanMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getServerInstanceToWorkerIdMap());
+
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setWorkerIdToServerInstanceMap(
+
_dispatchablePlanMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getWorkerIdToServerInstanceMap());
_canSkipShuffleForJoin = true;
}
@@ -174,13 +172,13 @@ public class GreedyShuffleRewriteVisitor implements
PlanNodeVisitor<Set<Colocati
} else if (colocationKeyCondition(oldColocationKeys, selector) &&
areServersSuperset(node.getPlanFragmentId(),
node.getSenderStageId())) {
node.setDistributionType(RelDistribution.Type.SINGLETON);
-
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap(
-
_dispatchablePlanMetadataMap.get(node.getSenderStageId()).getServerInstanceToWorkerIdMap());
+
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setWorkerIdToServerInstanceMap(
+
_dispatchablePlanMetadataMap.get(node.getSenderStageId()).getWorkerIdToServerInstanceMap());
return oldColocationKeys;
}
// This means we can't skip shuffle and there's a partitioning enforced
by receiver.
- int numPartitions =
-
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getServerInstanceToWorkerIdMap().size();
+ int numPartitions = new HashSet<>(
+
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getWorkerIdToServerInstanceMap().values()).size();
List<ColocationKey> colocationKeys = ((FieldSelectionKeySelector)
selector).getColumnIndices().stream()
.map(x -> new ColocationKey(x, numPartitions,
selector.hashAlgorithm())).collect(Collectors.toList());
return new HashSet<>(colocationKeys);
@@ -196,8 +194,8 @@ public class GreedyShuffleRewriteVisitor implements
PlanNodeVisitor<Set<Colocati
return new HashSet<>();
}
// This means we can't skip shuffle and there's a partitioning enforced by
receiver.
- int numPartitions =
-
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getServerInstanceToWorkerIdMap().size();
+ int numPartitions = new HashSet<>(
+
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getWorkerIdToServerInstanceMap().values()).size();
List<ColocationKey> colocationKeys = ((FieldSelectionKeySelector)
selector).getColumnIndices().stream()
.map(x -> new ColocationKey(x, numPartitions,
selector.hashAlgorithm())).collect(Collectors.toList());
return new HashSet<>(colocationKeys);
@@ -307,8 +305,9 @@ public class GreedyShuffleRewriteVisitor implements
PlanNodeVisitor<Set<Colocati
* Checks if servers assigned to the receiver stage are a super-set of the
sender stage.
*/
private boolean areServersSuperset(int receiverStageId, int senderStageId) {
- return
_dispatchablePlanMetadataMap.get(receiverStageId).getServerInstanceToWorkerIdMap().keySet()
-
.containsAll(_dispatchablePlanMetadataMap.get(senderStageId).getServerInstanceToWorkerIdMap().keySet());
+ return new HashSet<>(
+
_dispatchablePlanMetadataMap.get(receiverStageId).getWorkerIdToServerInstanceMap().values()).containsAll(
+
_dispatchablePlanMetadataMap.get(senderStageId).getWorkerIdToServerInstanceMap().values());
}
/*
@@ -317,15 +316,15 @@ public class GreedyShuffleRewriteVisitor implements
PlanNodeVisitor<Set<Colocati
* 2. Servers assigned to the join-stage are a superset of S.
*/
private boolean canServerAssignmentAllowShuffleSkip(int currentStageId, int
leftStageId, int rightStageId) {
- Set<QueryServerInstance> leftServerInstances = new
HashSet<>(_dispatchablePlanMetadataMap.get(leftStageId)
- .getServerInstanceToWorkerIdMap().keySet());
- Set<QueryServerInstance> rightServerInstances =
_dispatchablePlanMetadataMap.get(rightStageId)
- .getServerInstanceToWorkerIdMap().keySet();
- Set<QueryServerInstance> currentServerInstances =
_dispatchablePlanMetadataMap.get(currentStageId)
- .getServerInstanceToWorkerIdMap().keySet();
+ Set<QueryServerInstance> leftServerInstances =
+ new
HashSet<>(_dispatchablePlanMetadataMap.get(leftStageId).getWorkerIdToServerInstanceMap().values());
+ Set<QueryServerInstance> rightServerInstances =
+ new
HashSet<>(_dispatchablePlanMetadataMap.get(rightStageId).getWorkerIdToServerInstanceMap().values());
+ Set<QueryServerInstance> currentServerInstances =
+ new
HashSet<>(_dispatchablePlanMetadataMap.get(currentStageId).getWorkerIdToServerInstanceMap().values());
return leftServerInstances.containsAll(rightServerInstances)
- && leftServerInstances.size() == rightServerInstances.size()
- && currentServerInstances.containsAll(leftServerInstances);
+ && leftServerInstances.size() == rightServerInstances.size() &&
currentServerInstances.containsAll(
+ leftServerInstances);
}
/**
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 5fdb67c7c8..a57356cb33 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -73,9 +73,8 @@ public class WorkerManager {
// ROOT stage doesn't have a QueryServer as it is strictly only reducing
results, so here we simply assign the
// worker instance with identical server/mailbox port number.
DispatchablePlanMetadata metadata =
context.getDispatchablePlanMetadataMap().get(0);
- metadata.setServerInstanceToWorkerIdMap(
- Collections.singletonMap(new QueryServerInstance(_hostName, _port,
_port), Collections.singletonList(0)));
- metadata.setTotalWorkerCount(1);
+ metadata.setWorkerIdToServerInstanceMap(
+ Collections.singletonMap(0, new QueryServerInstance(_hostName, _port,
_port)));
for (PlanFragment child : rootFragment.getChildren()) {
assignWorkersToNonRootFragment(child, context);
}
@@ -156,21 +155,19 @@ public class WorkerManager {
// attach unavailable segments to metadata
if (!routingTable.getUnavailableSegments().isEmpty()) {
- metadata.addTableToUnavailableSegmentsMap(tableName,
routingTable.getUnavailableSegments());
+ metadata.addUnavailableSegments(tableName,
routingTable.getUnavailableSegments());
}
}
- int globalIdx = 0;
- Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new
HashMap<>();
+ int workerId = 0;
+ Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new
HashMap<>();
Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new
HashMap<>();
for (Map.Entry<ServerInstance, Map<String, List<String>>> entry :
serverInstanceToSegmentsMap.entrySet()) {
- QueryServerInstance queryServerInstance = new
QueryServerInstance(entry.getKey());
- serverInstanceToWorkerIdMap.put(queryServerInstance,
Collections.singletonList(globalIdx));
- workerIdToSegmentsMap.put(globalIdx, entry.getValue());
- globalIdx++;
+ workerIdToServerInstanceMap.put(workerId, new
QueryServerInstance(entry.getKey()));
+ workerIdToSegmentsMap.put(workerId, entry.getValue());
+ workerId++;
}
- metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
+ metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
- metadata.setTotalWorkerCount(globalIdx);
}
/**
@@ -219,8 +216,8 @@ public class WorkerManager {
// segments for the same partition is colocated
long indexToPick = context.getRequestId();
ColocatedPartitionInfo[] partitionInfoMap =
colocatedTableInfo._partitionInfoMap;
- int nextWorkerId = 0;
- Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new
HashMap<>();
+ int workerId = 0;
+ Map<Integer, QueryServerInstance> workedIdToServerInstanceMap = new
HashMap<>();
Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new
HashMap<>();
Map<String, ServerInstance> enabledServerInstanceMap =
_routingManager.getEnabledServerInstanceMap();
for (int i = 0; i < numPartitions; i++) {
@@ -233,15 +230,12 @@ public class WorkerManager {
pickEnabledServer(partitionInfo._fullyReplicatedServers,
enabledServerInstanceMap, indexToPick++);
Preconditions.checkState(serverInstance != null,
"Failed to find enabled fully replicated server for table: %s,
partition: %s in table: %s", tableName, i);
- QueryServerInstance queryServerInstance = new
QueryServerInstance(serverInstance);
- int workerId = nextWorkerId++;
- serverInstanceToWorkerIdMap.computeIfAbsent(queryServerInstance, k ->
new ArrayList<>()).add(workerId);
+ workedIdToServerInstanceMap.put(workerId, new
QueryServerInstance(serverInstance));
workerIdToSegmentsMap.put(workerId, getSegmentsMap(partitionInfo));
+ workerId++;
}
-
- metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
+ metadata.setWorkerIdToServerInstanceMap(workedIdToServerInstanceMap);
metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
- metadata.setTotalWorkerCount(nextWorkerId);
metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo);
metadata.setPartitionedTableScan(true);
}
@@ -260,8 +254,7 @@ public class WorkerManager {
if (children.size() > 0) {
DispatchablePlanMetadata firstChildMetadata =
metadataMap.get(children.get(0).getFragmentId());
if (firstChildMetadata.isPartitionedTableScan()) {
-
metadata.setServerInstanceToWorkerIdMap(firstChildMetadata.getServerInstanceToWorkerIdMap());
- metadata.setTotalWorkerCount(firstChildMetadata.getTotalWorkerCount());
+
metadata.setWorkerIdToServerInstanceMap(firstChildMetadata.getWorkerIdToServerInstanceMap());
return;
}
}
@@ -291,22 +284,18 @@ public class WorkerManager {
int stageParallelism =
Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
if (metadata.isRequiresSingletonInstance()) {
// require singleton should return a single global worker ID with 0;
- ServerInstance serverInstance =
serverInstances.get(RANDOM.nextInt(serverInstances.size()));
- metadata.setServerInstanceToWorkerIdMap(
- Collections.singletonMap(new QueryServerInstance(serverInstance),
Collections.singletonList(0)));
- metadata.setTotalWorkerCount(1);
+ metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0,
+ new
QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
} else {
- Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
new HashMap<>();
- int nextWorkerId = 0;
+ Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new
HashMap<>();
+ int workerId = 0;
for (ServerInstance serverInstance : serverInstances) {
- List<Integer> workerIds = new ArrayList<>();
+ QueryServerInstance queryServerInstance = new
QueryServerInstance(serverInstance);
for (int i = 0; i < stageParallelism; i++) {
- workerIds.add(nextWorkerId++);
+ workerIdToServerInstanceMap.put(workerId++, queryServerInstance);
}
- serverInstanceToWorkerIdMap.put(new
QueryServerInstance(serverInstance), workerIds);
}
- metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
- metadata.setTotalWorkerCount(nextWorkerId);
+ metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
}
}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 3015deea4b..e66a08a942 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -478,40 +478,40 @@ public class QueryCompilationTest extends
QueryEnvironmentTestBase {
return new Object[][] {
new Object[]{"EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT
col1, col3 FROM a",
"[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
- + "├── [1]@localhost:2
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
- + "│ └── [1]@localhost:2 PROJECT\n"
- + "│ └── [1]@localhost:2 TABLE SCAN (a) null\n"
- + "└── [1]@localhost:1
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
- + " └── [1]@localhost:1 PROJECT\n"
- + " └── [1]@localhost:1 TABLE SCAN (a) null\n"},
+ + "├── [1]@localhost:1
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ + "│ └── [1]@localhost:1 PROJECT\n"
+ + "│ └── [1]@localhost:1 TABLE SCAN (a) null\n"
+ + "└── [1]@localhost:2
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ + " └── [1]@localhost:2 PROJECT\n"
+ + " └── [1]@localhost:2 TABLE SCAN (a) null\n"},
new Object[]{"EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR "
+ "SELECT col1, COUNT(*) FROM a GROUP BY col1",
"[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
- + "├── [1]@localhost:2
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
- + "└── [1]@localhost:1
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
- + " └── [1]@localhost:1 AGGREGATE_FINAL\n"
- + " └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
- + " ├── [2]@localhost:2
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
- + " │ └── [2]@localhost:2 AGGREGATE_LEAF\n"
- + " │ └── [2]@localhost:2 TABLE SCAN (a) null\n"
- + " └── [2]@localhost:1
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
- + " └── [2]@localhost:1 AGGREGATE_LEAF\n"
- + " └── [2]@localhost:1 TABLE SCAN (a) null\n"},
+ + "├── [1]@localhost:1
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
+ + "└── [1]@localhost:2
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ + " └── [1]@localhost:2 AGGREGATE_FINAL\n"
+ + " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+ + " ├── [2]@localhost:1
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ + " │ └── [2]@localhost:1 AGGREGATE_LEAF\n"
+ + " │ └── [2]@localhost:1 TABLE SCAN (a) null\n"
+ + " └── [2]@localhost:2
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ + " └── [2]@localhost:2 AGGREGATE_LEAF\n"
+ + " └── [2]@localhost:2 TABLE SCAN (a) null\n"},
new Object[]{"EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a
JOIN b ON a.col1 = b.col1",
"[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
- + "├── [1]@localhost:2
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
- + "└── [1]@localhost:1
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
- + " └── [1]@localhost:1 PROJECT\n"
- + " └── [1]@localhost:1 JOIN\n"
- + " ├── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
- + " │ ├── [2]@localhost:2
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
- + " │ │ └── [2]@localhost:2 PROJECT\n"
- + " │ │ └── [2]@localhost:2 TABLE SCAN (a) null\n"
- + " │ └── [2]@localhost:1
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
- + " │ └── [2]@localhost:1 PROJECT\n"
- + " │ └── [2]@localhost:1 TABLE SCAN (a) null\n"
- + " └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
- + " └── [3]@localhost:1
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
+ + "├── [1]@localhost:1
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
+ + "└── [1]@localhost:2
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ + " └── [1]@localhost:2 PROJECT\n"
+ + " └── [1]@localhost:2 JOIN\n"
+ + " ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+ + " │ ├── [2]@localhost:1
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ + " │ │ └── [2]@localhost:1 PROJECT\n"
+ + " │ │ └── [2]@localhost:1 TABLE SCAN (a) null\n"
+ + " │ └── [2]@localhost:2
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ + " │ └── [2]@localhost:2 PROJECT\n"
+ + " │ └── [2]@localhost:2 TABLE SCAN (a) null\n"
+ + " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+ + " └── [3]@localhost:1
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " └── [3]@localhost:1 PROJECT\n"
+ " └── [3]@localhost:1 TABLE SCAN (b) null\n"}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]