andygrove commented on code in PR #1485:
URL: https://github.com/apache/datafusion-comet/pull/1485#discussion_r1988060641
##########
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java:
##########
@@ -48,30 +48,30 @@ public final class CometShuffleMemoryAllocator extends
CometShuffleMemoryAllocat
public static CometShuffleMemoryAllocatorTrait getInstance(
SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
boolean isSparkTesting = Utils.isTesting();
- boolean useUnifiedMemAllocator =
+ boolean useTestAllocator =
(boolean)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
- if (!useUnifiedMemAllocator) {
+ if (isSparkTesting || useTestAllocator) {
synchronized (CometShuffleMemoryAllocator.class) {
if (INSTANCE == null) {
// CometTestShuffleMemoryAllocator handles pages by itself so it can
be a singleton.
INSTANCE = new CometTestShuffleMemoryAllocator(conf,
taskMemoryManager, pageSize);
}
}
return INSTANCE;
- } else {
- if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
- throw new IllegalArgumentException(
- "CometShuffleMemoryAllocator should be used with off-heap "
- + "memory mode, but got "
- + taskMemoryManager.getTungstenMemoryMode());
- }
+ }
- // CometShuffleMemoryAllocator stores pages in TaskMemoryManager which
is not singleton,
- // but one instance per task. So we need to create a new instance for
each task.
- return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
+ if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
Review Comment:
I tested with TPC-H without using off-heap and ran into this:
```
Caused by: java.lang.IllegalArgumentException: CometShuffleMemoryAllocator
should be used with off-heap memory mode, but got ON_HEAP
at
org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.getInstance(CometShuffleMemoryAllocator.java:74)
at
org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.<init>(CometDiskBlockWriter.java:142)
at
org.apache.spark.sql.comet.execution.shuffle.CometBypassMergeSortShuffleWriter.write(CometBypassMergeSortShuffleWriter.java:181)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]