This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c1a19ba43b2 [multistage] Add configurable hash function support for
KeySelector in query planning (#16242)
c1a19ba43b2 is described below
commit c1a19ba43b29bd49a5d91e60cf8b8dd3ac7703ca
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Sun Jul 6 14:28:26 2025 -0700
[multistage] Add configurable hash function support for KeySelector in
query planning (#16242)
---
.../MultiStageBrokerRequestHandler.java | 4 +
pinot-common/src/main/proto/plan.proto | 1 +
.../org/apache/pinot/query/QueryEnvironment.java | 14 ++-
.../planner/logical/PinotLogicalQueryPlanner.java | 12 +-
.../query/planner/logical/PlanFragmenter.java | 2 +-
.../planner/logical/RelToPlanNodeConverter.java | 8 +-
.../planner/partitioning/EmptyKeySelector.java | 19 +++
.../planner/partitioning/HashFunctionSelector.java | 130 +++++++++++++++++++++
.../planner/partitioning/KeySelectorFactory.java | 10 +-
.../partitioning/MultiColumnKeySelector.java | 21 ++--
.../partitioning/SingleColumnKeySelector.java | 13 ++-
.../physical/v2/PRelToPlanNodeConverter.java | 4 +-
.../pinot/query/planner/plannode/ExchangeNode.java | 17 ++-
.../query/planner/plannode/MailboxSendNode.java | 37 +++---
.../query/planner/serde/PlanNodeDeserializer.java | 7 +-
.../query/planner/serde/PlanNodeSerializer.java | 1 +
.../query/planner/logical/StagesTestBase.java | 5 +-
.../partitioning/HashFunctionSelectorTest.java | 125 ++++++++++++++++++++
.../partitioning/KeySelectorHashFunctionTest.java | 109 +++++++++++++++++
.../runtime/operator/MailboxSendOperator.java | 14 ++-
.../runtime/operator/exchange/BlockExchange.java | 12 +-
.../apache/pinot/spi/utils/CommonConstants.java | 4 +
22 files changed, 513 insertions(+), 56 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 81bfe277d8e..7a3c4055a5a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -402,6 +402,9 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
int defaultLiteModeServerStageLimit = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_LITE_MODE_LEAF_STAGE_LIMIT,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT);
+ String defaultHashFunction = _config.getProperty(
+ CommonConstants.Broker.CONFIG_OF_BROKER_DEFAULT_HASH_FUNCTION,
+ CommonConstants.Broker.DEFAULT_BROKER_DEFAULT_HASH_FUNCTION);
boolean caseSensitive = !_config.getProperty(
CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY,
CommonConstants.Helix.DEFAULT_ENABLE_CASE_INSENSITIVE
@@ -422,6 +425,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
.defaultRunInBroker(defaultRunInBroker)
.defaultUseBrokerPruning(defaultUseBrokerPruning)
.defaultLiteModeServerStageLimit(defaultLiteModeServerStageLimit)
+ .defaultHashFunction(defaultHashFunction)
.build();
}
diff --git a/pinot-common/src/main/proto/plan.proto
b/pinot-common/src/main/proto/plan.proto
index 4e4fbb16843..703abb96075 100644
--- a/pinot-common/src/main/proto/plan.proto
+++ b/pinot-common/src/main/proto/plan.proto
@@ -159,6 +159,7 @@ message MailboxSendNode {
repeated Collation collations = 6;
bool sort = 7;
repeated int32 receiverStageIds = 8;
+ string hashFunction = 9;
}
message ProjectNode {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 68a73472d28..05be6cd8a9e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -475,7 +475,8 @@ public class QueryEnvironment {
_envConfig.getWorkerManager(), requestId,
_envConfig.getTableCache());
return pinotDispatchPlanner.createDispatchableSubPlanV2(plan.getLeft(),
plan.getRight());
}
- SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker,
useSpools(plannerContext.getOptions()));
+ SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker,
useSpools(plannerContext.getOptions()),
+ _envConfig.defaultHashFunction());
PinotDispatchPlanner pinotDispatchPlanner =
new PinotDispatchPlanner(plannerContext,
_envConfig.getWorkerManager(), _envConfig.getRequestId(),
_envConfig.getTableCache());
@@ -757,6 +758,17 @@ public class QueryEnvironment {
return CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT;
}
+ /**
+ * Default hash function to use for KeySelector data shuffling.
+ *
+ * This is treated as the default value for the broker and it is expected
to be obtained from a Pinot configuration.
+ * This default value can be always overridden at query level by the query
option.
+ */
+ @Value.Default
+ default String defaultHashFunction() {
+ return CommonConstants.Broker.DEFAULT_BROKER_DEFAULT_HASH_FUNCTION;
+ }
+
/**
* Returns the worker manager.
*
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
index 9bca603e3f4..1e2ddca9491 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
@@ -60,10 +60,11 @@ public class PinotLogicalQueryPlanner {
* Converts a Calcite {@link RelRoot} into a Pinot {@link SubPlan}.
*/
public static SubPlan makePlan(RelRoot relRoot,
- @Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker,
boolean useSpools) {
- PlanNode rootNode = new
RelToPlanNodeConverter(tracker).toPlanNode(relRoot.rel);
+ @Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker,
boolean useSpools,
+ String hashFunction) {
+ PlanNode rootNode = new RelToPlanNodeConverter(tracker,
hashFunction).toPlanNode(relRoot.rel);
- PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker,
useSpools);
+ PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker,
useSpools, hashFunction);
return new SubPlan(rootFragment,
new
SubPlanMetadata(RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel),
relRoot.fields), List.of());
@@ -108,7 +109,8 @@ public class PinotLogicalQueryPlanner {
}
private static PlanFragment planNodeToPlanFragment(
- PlanNode node, @Nullable TransformationTracker.Builder<PlanNode,
RelNode> tracker, boolean useSpools) {
+ PlanNode node, @Nullable TransformationTracker.Builder<PlanNode,
RelNode> tracker, boolean useSpools,
+ String hashFunction) {
PlanFragmenter fragmenter = new PlanFragmenter();
PlanFragmenter.Context fragmenterContext = fragmenter.createContext();
node = node.visit(fragmenter, fragmenterContext);
@@ -126,7 +128,7 @@ public class PinotLogicalQueryPlanner {
MailboxSendNode subPlanRootSenderNode =
new MailboxSendNode(node.getStageId(), node.getDataSchema(),
List.of(node), 0,
PinotRelExchangeType.getDefaultExchangeType(),
RelDistribution.Type.BROADCAST_DISTRIBUTED, null, false,
- null, false);
+ null, false, hashFunction);
PlanFragment planFragment1 = new PlanFragment(1, subPlanRootSenderNode,
new ArrayList<>());
planFragmentMap.put(1, planFragment1);
for (Int2ObjectMap.Entry<IntList> entry :
childPlanFragmentIdsMap.int2ObjectEntrySet()) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
index 8ffc4c78f99..420cf505bf2 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
@@ -186,7 +186,7 @@ public class PlanFragmenter implements
PlanNodeVisitor<PlanNode, PlanFragmenter.
MailboxSendNode mailboxSendNode =
new MailboxSendNode(senderPlanFragmentId,
nextPlanFragmentRoot.getDataSchema(), List.of(nextPlanFragmentRoot),
receiverPlanFragmentId, exchangeType, distributionType, keys,
node.isPrePartitioned(), node.getCollations(),
- node.isSortOnSender());
+ node.isSortOnSender(), node.getHashFunction());
_planFragmentMap.put(senderPlanFragmentId,
new PlanFragment(senderPlanFragmentId, mailboxSendNode, new
ArrayList<>()));
_mailboxSendToExchangeNodeMap.put(mailboxSendNode, node);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index c9526fa4884..abaf4704287 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -94,9 +94,12 @@ public final class RelToPlanNodeConverter {
private boolean _windowFunctionFound;
@Nullable
private final TransformationTracker.Builder<PlanNode, RelNode> _tracker;
+ private final String _hashFunction;
- public RelToPlanNodeConverter(@Nullable
TransformationTracker.Builder<PlanNode, RelNode> tracker) {
+ public RelToPlanNodeConverter(@Nullable
TransformationTracker.Builder<PlanNode, RelNode> tracker,
+ String hashFunction) {
_tracker = tracker;
+ _hashFunction = hashFunction;
}
/**
@@ -190,7 +193,8 @@ public final class RelToPlanNodeConverter {
}
}
return new ExchangeNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()),
convertInputs(node.getInputs()),
- exchangeType, distributionType, keys, prePartitioned, collations,
sortOnSender, sortOnReceiver, null, null);
+ exchangeType, distributionType, keys, prePartitioned, collations,
sortOnSender, sortOnReceiver, null, null,
+ _hashFunction);
}
private SetOpNode convertLogicalSetOp(SetOp node) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java
index fc02ff257b4..087d61ff11e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java
@@ -22,11 +22,25 @@ import javax.annotation.Nullable;
public class EmptyKeySelector implements KeySelector<Integer> {
+ private final String _hashFunction;
+
private EmptyKeySelector() {
+ this(KeySelector.DEFAULT_HASH_ALGORITHM);
+ }
+
+ private EmptyKeySelector(String hashFunction) {
+ _hashFunction = hashFunction;
}
public static final EmptyKeySelector INSTANCE = new EmptyKeySelector();
+ public static EmptyKeySelector getInstance(String hashFunction) {
+ if (KeySelector.DEFAULT_HASH_ALGORITHM.equals(hashFunction)) {
+ return INSTANCE;
+ }
+ return new EmptyKeySelector(hashFunction);
+ }
+
@Nullable
@Override
public Integer getKey(Object[] row) {
@@ -37,4 +51,9 @@ public class EmptyKeySelector implements KeySelector<Integer>
{
public int computeHash(Object[] input) {
return 0;
}
+
+ @Override
+ public String hashAlgorithm() {
+ return _hashFunction;
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelector.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelector.java
new file mode 100644
index 00000000000..6ab6ee7ccde
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelector.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.partitioning;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
+
+
+/**
+ * Utility class to compute hash values using different hash functions.
+ * This class provides consistent hash computation for KeySelector
implementations.
+ */
+public class HashFunctionSelector {
+ public static final String MURMUR2 = "murmur";
+ public static final String MURMUR3 = "murmur3";
+ public static final String HASH_CODE = "hashcode";
+
+ private HashFunctionSelector() {
+ }
+
+ /**
+ * Computes a hash code for a single value using the specified hash function.
+ * @param value The value to hash.
+ * @param hashFunction The hash function to use (e.g., "murmur", "murmur3",
"cityhash", "absHashCode").
+ * @return The computed hash code.
+ */
+ public static int computeHash(Object value, String hashFunction) {
+ if (value == null) {
+ return 0;
+ }
+
+ switch (hashFunction.toLowerCase()) {
+ case MURMUR2: return murmur2(value);
+ case MURMUR3: return murmur3(value);
+ // hashCode and absHashCode are treated the same for single hash.
+ case HASH_CODE:
+ // Default hash is absHashCode.
+ default: return absHashCode(value);
+ }
+ }
+
+ /**
+ * Computes a hash code for multiple values based on specified key IDs using
the specified hash function.
+ * This is useful for partitioning where only certain keys are relevant.
+ * @param values The array of values to hash.
+ * @param keyIds The array of key IDs indicating which values to include in
the hash computation.
+ * @param hashFunction The hash function to use (e.g., "murmur2", "murmur3",
"cityhash", "absHashCode").
+ * @return The computed hash code.
+ */
+ public static int computeMultiHash(Object[] values, int[] keyIds, String
hashFunction) {
+ if (values == null || values.length == 0) {
+ return 0;
+ }
+
+ switch (hashFunction.toLowerCase()) {
+ case MURMUR2: return murmur2(values, keyIds);
+ case MURMUR3: return murmur3(values, keyIds);
+ // hashCode and absHashCode are treated the same for multi hash.
+ case HASH_CODE:
+ // We should hashCode instead of absHashCode for multi hash to
maintain consistency with legacy behavior.
+ default: return hashCode(values, keyIds);
+ }
+ }
+
+ private static int absHashCode(Object value) {
+ return value.hashCode() & Integer.MAX_VALUE;
+ }
+
+ private static int hashCode(Object value) {
+ return value.hashCode();
+ }
+
+ private static int murmur2(Object value) {
+ return MurmurHashFunctions.murmurHash2(toBytes(value)) & Integer.MAX_VALUE;
+ }
+
+ private static int murmur3(Object value) {
+ return MurmurHashFunctions.murmurHash3X64Bit32(toBytes(value), 0) &
Integer.MAX_VALUE;
+ }
+
+ private static int murmur2(Object[] values, int[] keyIds) {
+ int hash = 0;
+ for (int keyId : keyIds) {
+ if (keyId < values.length && values[keyId] != null) {
+ hash += murmur2(values[keyId]);
+ }
+ }
+ return hash & Integer.MAX_VALUE;
+ }
+
+ private static int murmur3(Object[] values, int[] keyIds) {
+ int hash = 0;
+ for (int keyId : keyIds) {
+ if (keyId < values.length && values[keyId] != null) {
+ hash += murmur3(values[keyId]);
+ }
+ }
+ return hash & Integer.MAX_VALUE;
+ }
+
+ private static int hashCode(Object[] values, int[] keyIds) {
+ int hash = 0;
+ for (int keyId : keyIds) {
+ if (keyId < values.length && values[keyId] != null) {
+ hash += hashCode(values[keyId]);
+ }
+ }
+ return hash & Integer.MAX_VALUE;
+ }
+
+ private static byte[] toBytes(Object value) {
+ return value.toString().getBytes(StandardCharsets.UTF_8);
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java
index b70615f68d0..6e0a0159b6e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java
@@ -26,17 +26,21 @@ public class KeySelectorFactory {
}
public static KeySelector<?> getKeySelector(List<Integer> keyIds) {
+ return getKeySelector(keyIds, KeySelector.DEFAULT_HASH_ALGORITHM);
+ }
+
+ public static KeySelector<?> getKeySelector(List<Integer> keyIds, String
hashFunction) {
int numKeys = keyIds.size();
if (numKeys == 0) {
- return EmptyKeySelector.INSTANCE;
+ return EmptyKeySelector.getInstance(hashFunction);
} else if (numKeys == 1) {
- return new SingleColumnKeySelector(keyIds.get(0));
+ return new SingleColumnKeySelector(keyIds.get(0), hashFunction);
} else {
int[] ids = new int[numKeys];
for (int i = 0; i < numKeys; i++) {
ids[i] = keyIds.get(i);
}
- return new MultiColumnKeySelector(ids);
+ return new MultiColumnKeySelector(ids, hashFunction);
}
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java
index c510e5b7014..51fe01668f3 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java
@@ -23,9 +23,15 @@ import org.apache.pinot.core.data.table.Key;
public class MultiColumnKeySelector implements KeySelector<Key> {
private final int[] _keyIds;
+ private final String _hashFunction;
public MultiColumnKeySelector(int[] keyIds) {
+ this(keyIds, KeySelector.DEFAULT_HASH_ALGORITHM);
+ }
+
+ public MultiColumnKeySelector(int[] keyIds, String hashFunction) {
_keyIds = keyIds;
+ _hashFunction = hashFunction;
}
@Override
@@ -56,15 +62,12 @@ public class MultiColumnKeySelector implements
KeySelector<Key> {
// also see: https://github.com/apache/pinot/issues/9998
//
// TODO: consider better hashing algorithms than hashCode sum, such as
XOR'ing
- int hashCode = 0;
- for (int keyId : _keyIds) {
- Object value = input[keyId];
- if (value != null) {
- hashCode += value.hashCode();
- }
- }
-
// return a positive number because this is used directly to modulo-index
- return hashCode & Integer.MAX_VALUE;
+ return HashFunctionSelector.computeMultiHash(input, _keyIds,
_hashFunction);
+ }
+
+ @Override
+ public String hashAlgorithm() {
+ return _hashFunction;
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java
index 071078d3b60..3d19523941c 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java
@@ -23,9 +23,15 @@ import javax.annotation.Nullable;
public class SingleColumnKeySelector implements KeySelector<Object> {
private final int _keyId;
+ private final String _hashFunction;
public SingleColumnKeySelector(int keyId) {
+ this(keyId, KeySelector.DEFAULT_HASH_ALGORITHM);
+ }
+
+ public SingleColumnKeySelector(int keyId, String hashFunction) {
_keyId = keyId;
+ _hashFunction = hashFunction;
}
@Nullable
@@ -37,6 +43,11 @@ public class SingleColumnKeySelector implements
KeySelector<Object> {
@Override
public int computeHash(Object[] input) {
Object key = input[_keyId];
- return key != null ? key.hashCode() & Integer.MAX_VALUE : 0;
+ return HashFunctionSelector.computeHash(key, _hashFunction);
+ }
+
+ @Override
+ public String hashAlgorithm() {
+ return _hashFunction;
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
index 8dc8c81dc9d..26b395b4648 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java
@@ -49,6 +49,7 @@ import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.logical.RexExpressionUtils;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAsOfJoin;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
@@ -117,7 +118,8 @@ public class PRelToPlanNodeConverter {
return new ExchangeNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()),
new ArrayList<>(), node.getRelExchangeType(),
RelDistribution.Type.ANY, node.getDistributionKeys(),
false, node.getRelCollation().getFieldCollations(), false,
- !node.getRelCollation().getKeys().isEmpty(), Set.of() /* table names
*/, node.getExchangeStrategy());
+ !node.getRelCollation().getKeys().isEmpty(), Set.of() /* table names
*/, node.getExchangeStrategy(),
+ KeySelector.DEFAULT_HASH_ALGORITHM);
}
public static SetOpNode convertSetOp(SetOp node) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
index ea02ca6b7ef..a9e8bf0b4a9 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
@@ -48,11 +48,12 @@ public class ExchangeNode extends BasePlanNode {
private final Set<String> _tableNames;
@Nullable
private final ExchangeStrategy _exchangeStrategy;
+ private final String _hashFunction;
public ExchangeNode(int stageId, DataSchema dataSchema, List<PlanNode>
inputs, PinotRelExchangeType exchangeType,
RelDistribution.Type distributionType, @Nullable List<Integer> keys,
boolean prePartitioned,
@Nullable List<RelFieldCollation> collations, boolean sortOnSender,
boolean sortOnReceiver,
- @Nullable Set<String> tableNames, ExchangeStrategy exchangeStrategy) {
+ @Nullable Set<String> tableNames, ExchangeStrategy exchangeStrategy,
String hashFunction) {
super(stageId, dataSchema, null, inputs);
_exchangeType = exchangeType;
_distributionType = distributionType;
@@ -63,6 +64,7 @@ public class ExchangeNode extends BasePlanNode {
_sortOnReceiver = sortOnReceiver;
_tableNames = tableNames;
_exchangeStrategy = exchangeStrategy;
+ _hashFunction = hashFunction;
}
public PinotRelExchangeType getExchangeType() {
@@ -105,6 +107,10 @@ public class ExchangeNode extends BasePlanNode {
return _exchangeStrategy;
}
+ public String getHashFunction() {
+ return _hashFunction;
+ }
+
@Override
public String explain() {
return "EXCHANGE";
@@ -118,7 +124,7 @@ public class ExchangeNode extends BasePlanNode {
@Override
public PlanNode withInputs(List<PlanNode> inputs) {
return new ExchangeNode(_stageId, _dataSchema, inputs, _exchangeType,
_distributionType, _keys, _prePartitioned,
- _collations, _sortOnSender, _sortOnReceiver, _tableNames, null);
+ _collations, _sortOnSender, _sortOnReceiver, _tableNames, null,
_hashFunction);
}
@Override
@@ -135,13 +141,14 @@ public class ExchangeNode extends BasePlanNode {
ExchangeNode that = (ExchangeNode) o;
return _sortOnSender == that._sortOnSender && _sortOnReceiver ==
that._sortOnReceiver
&& _prePartitioned == that._prePartitioned && _exchangeType ==
that._exchangeType
- && _distributionType == that._distributionType &&
Objects.equals(_keys, that._keys) && Objects.equals(
- _collations, that._collations) && Objects.equals(_tableNames,
that._tableNames);
+ && _distributionType == that._distributionType &&
Objects.equals(_keys, that._keys)
+ && Objects.equals(_collations, that._collations) &&
Objects.equals(_tableNames, that._tableNames)
+ && Objects.equals(_hashFunction, that._hashFunction);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), _exchangeType, _distributionType,
_keys, _sortOnSender, _sortOnReceiver,
- _prePartitioned, _collations, _tableNames);
+ _prePartitioned, _collations, _tableNames, _hashFunction);
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
index c40fa50b000..ca481f3923a 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
public class MailboxSendNode extends BasePlanNode {
@@ -38,12 +39,13 @@ public class MailboxSendNode extends BasePlanNode {
private final boolean _prePartitioned;
private final List<RelFieldCollation> _collations;
private final boolean _sort;
+ private final String _hashFunction;
// NOTE: null List is converted to empty List because there is no way to
differentiate them in proto during ser/de.
private MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode>
inputs,
BitSet receiverStages, PinotRelExchangeType exchangeType,
RelDistribution.Type distributionType, @Nullable List<Integer> keys,
boolean prePartitioned,
- @Nullable List<RelFieldCollation> collations, boolean sort) {
+ @Nullable List<RelFieldCollation> collations, boolean sort, String
hashFunction) {
super(stageId, dataSchema, null, inputs);
_receiverStages = receiverStages;
_exchangeType = exchangeType;
@@ -52,14 +54,15 @@ public class MailboxSendNode extends BasePlanNode {
_prePartitioned = prePartitioned;
_collations = collations != null ? collations : List.of();
_sort = sort;
+ _hashFunction = hashFunction;
}
public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode>
inputs,
@Nullable List<Integer> receiverStages, PinotRelExchangeType
exchangeType,
RelDistribution.Type distributionType, @Nullable List<Integer> keys,
boolean prePartitioned,
- @Nullable List<RelFieldCollation> collations, boolean sort) {
+ @Nullable List<RelFieldCollation> collations, boolean sort, String
hashFunction) {
this(stageId, dataSchema, inputs, toBitSet(receiverStages), exchangeType,
- distributionType, keys, prePartitioned, collations, sort);
+ distributionType, keys, prePartitioned, collations, sort,
hashFunction);
}
public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode>
inputs,
@@ -67,7 +70,15 @@ public class MailboxSendNode extends BasePlanNode {
RelDistribution.Type distributionType, @Nullable List<Integer> keys,
boolean prePartitioned,
@Nullable List<RelFieldCollation> collations, boolean sort) {
this(stageId, dataSchema, inputs, toBitSet(receiverStage), exchangeType,
distributionType, keys, prePartitioned,
- collations, sort);
+ collations, sort, KeySelector.DEFAULT_HASH_ALGORITHM);
+ }
+
+ public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode>
inputs,
+ int receiverStage, PinotRelExchangeType exchangeType,
+ RelDistribution.Type distributionType, @Nullable List<Integer> keys,
boolean prePartitioned,
+ @Nullable List<RelFieldCollation> collations, boolean sort, String
hashFunction) {
+ this(stageId, dataSchema, inputs, toBitSet(receiverStage), exchangeType,
distributionType, keys, prePartitioned,
+ collations, sort, hashFunction);
}
private static BitSet toBitSet(int receiverStage) {
@@ -87,13 +98,6 @@ public class MailboxSendNode extends BasePlanNode {
return bitSet;
}
- public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode>
inputs,
- PinotRelExchangeType exchangeType, RelDistribution.Type
distributionType, @Nullable List<Integer> keys,
- boolean prePartitioned, @Nullable List<RelFieldCollation> collations,
boolean sort) {
- this(stageId, dataSchema, inputs, new BitSet(), exchangeType,
distributionType, keys, prePartitioned, collations,
- sort);
- }
-
public boolean sharesReceiverStages(MailboxSendNode other) {
return _receiverStages.intersects(other._receiverStages);
}
@@ -167,6 +171,10 @@ public class MailboxSendNode extends BasePlanNode {
return _sort;
}
+ public String getHashFunction() {
+ return _hashFunction;
+ }
+
@Override
public String explain() {
StringBuilder sb = new StringBuilder();
@@ -190,7 +198,7 @@ public class MailboxSendNode extends BasePlanNode {
@Override
public PlanNode withInputs(List<PlanNode> inputs) {
return new MailboxSendNode(_stageId, _dataSchema, inputs, _receiverStages,
_exchangeType, _distributionType, _keys,
- _prePartitioned, _collations, _sort);
+ _prePartitioned, _collations, _sort, _hashFunction);
}
@Override
@@ -207,13 +215,14 @@ public class MailboxSendNode extends BasePlanNode {
MailboxSendNode that = (MailboxSendNode) o;
return Objects.equals(_receiverStages, that._receiverStages) &&
_prePartitioned == that._prePartitioned
&& _sort == that._sort && _exchangeType == that._exchangeType &&
_distributionType == that._distributionType
- && Objects.equals(_keys, that._keys) && Objects.equals(_collations,
that._collations);
+ && Objects.equals(_keys, that._keys) && Objects.equals(_collations,
that._collations)
+ && Objects.equals(_hashFunction, that._hashFunction);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), _receiverStages, _exchangeType,
_distributionType, _keys, _prePartitioned,
- _collations, _sort);
+ _collations, _sort, _hashFunction);
}
@Override
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
index 9cf5cd80000..b9701323471 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExplainedNode;
import org.apache.pinot.query.planner.plannode.FilterNode;
@@ -129,12 +130,16 @@ public class PlanNodeDeserializer {
} else {
receiverIds = protoReceiverIds;
}
+ String hashFunction = protoMailboxSendNode.getHashFunction();
+ if (hashFunction == null || hashFunction.isEmpty()) {
+ hashFunction = KeySelector.DEFAULT_HASH_ALGORITHM;
+ }
return new MailboxSendNode(protoNode.getStageId(),
extractDataSchema(protoNode), extractInputs(protoNode),
receiverIds,
convertExchangeType(protoMailboxSendNode.getExchangeType()),
convertDistributionType(protoMailboxSendNode.getDistributionType()),
protoMailboxSendNode.getKeysList(),
protoMailboxSendNode.getPrePartitioned(),
convertCollations(protoMailboxSendNode.getCollationsList()),
- protoMailboxSendNode.getSort());
+ protoMailboxSendNode.getSort(), hashFunction);
}
private static ProjectNode deserializeProjectNode(Plan.PlanNode protoNode) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
index 359b3895eef..c23f3bfbd4d 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
@@ -161,6 +161,7 @@ public class PlanNodeSerializer {
.setDistributionType(convertDistributionType(node.getDistributionType()))
.addAllKeys(node.getKeys())
.setPrePartitioned(node.isPrePartitioned())
+ .setHashFunction(node.getHashFunction())
.addAllCollations(convertCollations(node.getCollations()))
.setSort(node.isSort())
.build();
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
index 3735a829766..021e65fa765 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
@@ -380,8 +381,8 @@ public class StagesTestBase {
PlanNode input = _childBuilder.build(_senderStageId);
DataSchema mySchema = input.getDataSchema();
- _sender = new MailboxSendNode(_senderStageId, mySchema, List.of(input),
null,
- null, null, false, null, false);
+ _sender = new MailboxSendNode(_senderStageId, mySchema, List.of(input),
0, null,
+ null, null, false, null, false, KeySelector.DEFAULT_HASH_ALGORITHM);
}
/**
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelectorTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelectorTest.java
new file mode 100644
index 00000000000..d6d6a2209cf
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/HashFunctionSelectorTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.partitioning;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Test for {@link HashFunctionSelector}
+ */
+public class HashFunctionSelectorTest {
+
+ @Test
+ public void testAbsHashCode() {
+ String value = "test";
+ int hash1 = HashFunctionSelector.computeHash(value, "abshashcode");
+ int hash2 = HashFunctionSelector.computeHash(value, "abshashcode");
+
+ // Same input should produce same hash
+ Assert.assertEquals(hash1, hash2);
+
+ // Should be positive
+ Assert.assertTrue(hash1 >= 0);
+ }
+
+ @Test
+ public void testMurmur2() {
+ String value = "test";
+ int hash1 = HashFunctionSelector.computeHash(value, "murmur");
+ int hash2 = HashFunctionSelector.computeHash(value, "murmur");
+
+ // Same input should produce same hash
+ Assert.assertEquals(hash1, hash2);
+
+ // Should be positive
+ Assert.assertTrue(hash1 >= 0);
+
+ // Should be different from absHashCode
+ int absHash = HashFunctionSelector.computeHash(value, "abshashcode");
+ Assert.assertNotEquals(hash1, absHash);
+ }
+
+ @Test
+ public void testMurmur3() {
+ String value = "test";
+ int hash1 = HashFunctionSelector.computeHash(value, "murmur3");
+ int hash2 = HashFunctionSelector.computeHash(value, "murmur3");
+
+ // Same input should produce same hash
+ Assert.assertEquals(hash1, hash2);
+
+ // Should be positive
+ Assert.assertTrue(hash1 >= 0);
+
+ // Should be different from other hash functions
+ int absHash = HashFunctionSelector.computeHash(value, "abshashcode");
+ int murmur2Hash = HashFunctionSelector.computeHash(value, "murmur2");
+ Assert.assertNotEquals(hash1, absHash);
+ Assert.assertNotEquals(hash1, murmur2Hash);
+ }
+
+ @Test
+ public void testHashCode() {
+ String value = "test";
+ int hash1 = HashFunctionSelector.computeHash(value, "hashcode");
+ int hash2 = HashFunctionSelector.computeHash(value, "hashcode");
+
+ // Same input should produce same hash
+ Assert.assertEquals(hash1, hash2);
+
+ // Should be positive
+ Assert.assertTrue(hash1 >= 0);
+
+ // Should be different from murmur and murmur3 but same as absHashCode
+ int absHash = HashFunctionSelector.computeHash(value, "abshashcode");
+ int murmur2Hash = HashFunctionSelector.computeHash(value, "murmur");
+ int murmur3Hash = HashFunctionSelector.computeHash(value, "murmur3");
+ Assert.assertEquals(hash1, absHash);
+ Assert.assertNotEquals(hash1, murmur2Hash);
+ Assert.assertNotEquals(hash1, murmur3Hash);
+ }
+
+ @Test
+ public void testNullValue() {
+ // Null values should return 0 for all hash functions
+ Assert.assertEquals(HashFunctionSelector.computeHash(null, "abshashcode"),
0);
+ Assert.assertEquals(HashFunctionSelector.computeHash(null, "murmur"), 0);
+ Assert.assertEquals(HashFunctionSelector.computeHash(null, "murmur3"), 0);
+ Assert.assertEquals(HashFunctionSelector.computeHash(null, "cityhash"), 0);
+ }
+
+ @Test
+ public void testUnknownHashFunction() {
+ String value = "test";
+ // Unknown hash function should default to absHashCode
+ int hash = HashFunctionSelector.computeHash(value, "unknown");
+ int expectedHash = HashFunctionSelector.computeHash(value, "abshashcode");
+ Assert.assertEquals(hash, expectedHash);
+ }
+
+ @Test
+ public void testCaseInsensitive() {
+ String value = "test";
+ int hash1 = HashFunctionSelector.computeHash(value, "MURMUR");
+ int hash2 = HashFunctionSelector.computeHash(value, "murmur");
+ Assert.assertEquals(hash1, hash2);
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/KeySelectorHashFunctionTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/KeySelectorHashFunctionTest.java
new file mode 100644
index 00000000000..83a1eed38ff
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/partitioning/KeySelectorHashFunctionTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.partitioning;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Test for KeySelector implementations with custom hash functions
+ */
+public class KeySelectorHashFunctionTest {
+
+ @Test
+ public void testSingleColumnKeySelectorWithCustomHashFunction() {
+ SingleColumnKeySelector selector = new SingleColumnKeySelector(0,
"murmur");
+
+ Object[] row = {"test"};
+ int hash = selector.computeHash(row);
+
+ // Should be positive
+ Assert.assertTrue(hash >= 0);
+
+ // Should use the specified hash function
+ Assert.assertEquals(selector.hashAlgorithm(), "murmur");
+
+ // Same input should produce same hash
+ int hash2 = selector.computeHash(row);
+ Assert.assertEquals(hash, hash2);
+ }
+
+ @Test
+ public void testMultiColumnKeySelectorWithCustomHashFunction() {
+ MultiColumnKeySelector selector = new MultiColumnKeySelector(new int[]{0,
1}, "murmur3");
+
+ Object[] row = {"test1", "test2"};
+ int hash = selector.computeHash(row);
+
+ // Should be positive
+ Assert.assertTrue(hash >= 0);
+
+ // Should use the specified hash function
+ Assert.assertEquals(selector.hashAlgorithm(), "murmur3");
+
+ // Same input should produce same hash
+ int hash2 = selector.computeHash(row);
+ Assert.assertEquals(hash, hash2);
+ }
+
+ @Test
+ public void testEmptyKeySelectorWithCustomHashFunction() {
+ EmptyKeySelector selector = EmptyKeySelector.getInstance("hashcode");
+
+ Object[] row = {"test"};
+ int hash = selector.computeHash(row);
+
+ // Should always return 0
+ Assert.assertEquals(hash, 0);
+
+ // Should use the specified hash function
+ Assert.assertEquals(selector.hashAlgorithm(), "hashcode");
+ }
+
+ @Test
+ public void testKeySelectorFactoryWithCustomHashFunction() {
+ // Test single column
+ KeySelector<?> singleSelector =
KeySelectorFactory.getKeySelector(java.util.List.of(0), "murmur");
+ Assert.assertEquals(singleSelector.hashAlgorithm(), "murmur");
+
+ // Test multi column
+ KeySelector<?> multiSelector =
KeySelectorFactory.getKeySelector(java.util.List.of(0, 1), "murmur3");
+ Assert.assertEquals(multiSelector.hashAlgorithm(), "murmur3");
+
+ // Test empty
+ KeySelector<?> emptySelector =
KeySelectorFactory.getKeySelector(java.util.List.of(), "hashcode");
+ Assert.assertEquals(emptySelector.hashAlgorithm(), "hashcode");
+ }
+
+ @Test
+ public void testKeySelectorFactoryWithDefaultHashFunction() {
+ // Test single column
+ KeySelector<?> singleSelector =
KeySelectorFactory.getKeySelector(java.util.List.of(0));
+ Assert.assertEquals(singleSelector.hashAlgorithm(),
KeySelector.DEFAULT_HASH_ALGORITHM);
+
+ // Test multi column
+ KeySelector<?> multiSelector =
KeySelectorFactory.getKeySelector(java.util.List.of(0, 1));
+ Assert.assertEquals(multiSelector.hashAlgorithm(),
KeySelector.DEFAULT_HASH_ALGORITHM);
+
+ // Test empty
+ KeySelector<?> emptySelector =
KeySelectorFactory.getKeySelector(java.util.List.of());
+ Assert.assertEquals(emptySelector.hashAlgorithm(),
KeySelector.DEFAULT_HASH_ALGORITHM);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 6c223e861f1..e2a1009d53f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -85,7 +85,7 @@ public class MailboxSendOperator extends MultiStageOperator {
* Creates a {@link BlockExchange} for the given {@link MailboxSendNode}.
*
* In normal cases, where the sender sends data to a single receiver stage,
this method just delegates on
- * {@link #getBlockExchange(OpChainExecutionContext, int,
RelDistribution.Type, List, StatMap, BlockSplitter)}.
+ * {@link #getBlockExchange(OpChainExecutionContext, int, MailboxSendNode,
StatMap, BlockSplitter)}.
*
* In case of a multi-sender node, this method creates a two steps exchange:
* <ol>
@@ -102,20 +102,20 @@ public class MailboxSendOperator extends
MultiStageOperator {
if (!node.isMultiSend()) {
// it is guaranteed that there is exactly one receiver stage
int receiverStageId = node.getReceiverStageIds().iterator().next();
- return getBlockExchange(ctx, receiverStageId,
node.getDistributionType(), node.getKeys(), statMap, mainSplitter);
+ return getBlockExchange(ctx, receiverStageId, node, statMap,
mainSplitter);
}
List<SendingMailbox> perStageSendingMailboxes = new ArrayList<>();
// The inner splitter is a NO_OP because the outer splitter will take care
of splitting the blocks
BlockSplitter innerSplitter = BlockSplitter.NO_OP;
for (int receiverStageId : node.getReceiverStageIds()) {
BlockExchange blockExchange =
- getBlockExchange(ctx, receiverStageId, node.getDistributionType(),
node.getKeys(), statMap, innerSplitter);
+ getBlockExchange(ctx, receiverStageId, node, statMap, innerSplitter);
perStageSendingMailboxes.add(blockExchange.asSendingMailbox(Integer.toString(receiverStageId)));
}
Function<List<SendingMailbox>, Integer> statsIndexChooser =
getStatsIndexChooser(ctx, node);
return BlockExchange.getExchange(perStageSendingMailboxes,
RelDistribution.Type.BROADCAST_DISTRIBUTED,
- Collections.emptyList(), mainSplitter, statsIndexChooser);
+ Collections.emptyList(), mainSplitter, statsIndexChooser,
node.getHashFunction());
}
private static Function<List<SendingMailbox>, Integer>
getStatsIndexChooser(OpChainExecutionContext ctx,
@@ -154,7 +154,8 @@ public class MailboxSendOperator extends MultiStageOperator
{
* In case of a multi-sender node, this method will be called for each
receiver stage.
*/
private static BlockExchange getBlockExchange(OpChainExecutionContext
context, int receiverStageId,
- RelDistribution.Type distributionType, List<Integer> keys,
StatMap<StatKey> statMap, BlockSplitter splitter) {
+ MailboxSendNode node, StatMap<StatKey> statMap, BlockSplitter splitter) {
+ RelDistribution.Type distributionType = node.getDistributionType();
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(distributionType),
"Unsupported distribution type: %s",
distributionType);
MailboxService mailboxService = context.getMailboxService();
@@ -172,7 +173,8 @@ public class MailboxSendOperator extends MultiStageOperator
{
.map(v -> mailboxService.getSendingMailbox(v.getHostname(),
v.getPort(), v.getMailboxId(), deadlineMs, statMap))
.collect(Collectors.toList());
statMap.merge(StatKey.FAN_OUT, sendingMailboxes.size());
- return BlockExchange.getExchange(sendingMailboxes, distributionType, keys,
splitter);
+ return BlockExchange.getExchange(sendingMailboxes, distributionType,
node.getKeys(), splitter,
+ node.getHashFunction());
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index db30427525c..2518a31b761 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -63,13 +63,15 @@ public abstract class BlockExchange {
* mailbox index that receives the stats should be tuned.
* @param statsIndexChooser a function to choose the mailbox index to send
stats to.
*/
- public static BlockExchange getExchange(List<SendingMailbox>
sendingMailboxes, RelDistribution.Type distributionType,
- List<Integer> keys, BlockSplitter splitter,
Function<List<SendingMailbox>, Integer> statsIndexChooser) {
+ public static BlockExchange getExchange(List<SendingMailbox>
sendingMailboxes,
+ RelDistribution.Type distributionType, List<Integer> keys, BlockSplitter
splitter,
+ Function<List<SendingMailbox>, Integer> statsIndexChooser, String
hashFunction) {
switch (distributionType) {
case SINGLETON:
return new SingletonExchange(sendingMailboxes, splitter,
statsIndexChooser);
case HASH_DISTRIBUTED:
- return new HashExchange(sendingMailboxes,
KeySelectorFactory.getKeySelector(keys), splitter, statsIndexChooser);
+ return new HashExchange(sendingMailboxes,
KeySelectorFactory.getKeySelector(keys, hashFunction), splitter,
+ statsIndexChooser);
case RANDOM_DISTRIBUTED:
return new RandomExchange(sendingMailboxes, splitter,
statsIndexChooser);
case BROADCAST_DISTRIBUTED:
@@ -83,8 +85,8 @@ public abstract class BlockExchange {
}
public static BlockExchange getExchange(List<SendingMailbox>
sendingMailboxes, RelDistribution.Type distributionType,
- List<Integer> keys, BlockSplitter splitter) {
- return getExchange(sendingMailboxes, distributionType, keys, splitter,
RANDOM_INDEX_CHOOSER);
+ List<Integer> keys, BlockSplitter splitter, String hashFunction) {
+ return getExchange(sendingMailboxes, distributionType, keys, splitter,
RANDOM_INDEX_CHOOSER, hashFunction);
}
protected BlockExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter
splitter,
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 290cf20c231..d6de4b9ba22 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -533,6 +533,10 @@ public class CommonConstants {
"pinot.broker.multistage.lite.mode.leaf.stage.limit";
public static final int DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT = 100_000;
+ // Config for default hash function used in KeySelector for data shuffling
+ public static final String CONFIG_OF_BROKER_DEFAULT_HASH_FUNCTION =
"pinot.broker.multistage.default.hash.function";
+ public static final String DEFAULT_BROKER_DEFAULT_HASH_FUNCTION =
"absHashCode";
+
// When the server instance's pool field is null or the pool contains
multi distinguished group value, the broker
// would set the pool to -1 in the routing table for that server.
public static final int FALLBACK_POOL_ID = -1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]